第五章:线程同步机制

  1. 1. 线程竞争
  2. 2. 互斥量
    1. 2.1. 互斥量的初始化
    2. 2.2. 对临界区加锁
    3. 2.3. 举个栗子
  3. 3. 条件变量
    1. 3.1. 条件变量的初始化
    2. 3.2. 等待条件
    3. 3.3. 发送信号
    4. 3.4. 举个栗子
  4. 4. 信号量
    1. 4.1. sem_init(3)
    2. 4.2. sem_destroy(3)
    3. 4.3. sem_wait(3)
    4. 4.4. sem_post(3)
    5. 4.5. 举个栗子【互斥量+条件变量 -> 单个信号量】
  5. 5. 互斥量 VS 条件变量 VS 信号量

主要介绍线程同步的三种方式:互斥量、条件变量、信号量

注:标题中显示的函数数字表示该函数在man手册中所在章节(第2章的是系统调用函数,第3章的是标准函数)

线程竞争

使用20个线程分别对同一个文件进行如下操作:打开,读取数据,加1,覆盖写回去,关闭文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// add.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>

#define THRNUM 20
#define BUFSIZE 1024
#define FNAME "/tmp/out"

void *thr_func(void *arg)
{
FILE *fp;
char buf[BUFSIZE];

fp = fopen(FNAME, "r+");
if (fp == NULL)
{
perror("fopen()");
exit(1);
}
fgets(buf, BUFSIZE, fp);
fseek(fp, 0, SEEK_SET);
printf(" %d\n", atoi(buf) + 1);
fprintf(fp, "%d\n", atoi(buf) + 1);
fclose(fp);

pthread_exit(0);
}

int main(int argc, char **argv)
{
pthread_t tid[THRNUM];
int i, err;

for (i = 0; i < THRNUM; i++)
{
err = pthread_create(tid + i, NULL, thr_func, NULL);
if (err)
{
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}
}

for (i = 0; i < THRNUM; i++)
{
pthread_join(tid[i], NULL);
}

exit(0);
}

某次编译运行的结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ cat /tmp/out
0
$ make add
cc -pthread -pthread add.c -o add
$ ./add
1
2
3
3
4
5
6
7
8
9
10
9
10
11
11
12
13
14
14
15
$ cat /tmp/out
15

多个线程同时对同一个文件进行读写操作,如果线程之间没有进行同步操作,可能导致文件中的数据错乱或者不确定的结果,因为对文件的读写操作不一定是原子的。

多线程的并发操作中,往往需要使用线程同步机制来保护对共享资源(如文件)的访问,以避免数据竞争和不确定的结果。

互斥量

互斥量(mutex,mutual exclusion)是一种用于线程同步的机制,它可以用来保护共享资源的访问。在多线程程序中,多个线程可能同时访问同一个共享资源,如果没有同步机制,就会导致竞争条件(race condition)的问题,从而导致程序出现不可预期的错误。互斥量可以用来解决这个问题,它可以实现对共享资源的互斥访问,保证同一时刻只有一个线程可以访问共享资源。通俗点说,互斥量就是限制一段代码以读的形式实现

互斥量可以看作是一个特殊的变量,它有两种状态:锁定(locked)和未锁定(unlocked),只有获得锁的线程才能访问共享资源,其他线程需要等待锁的释放才能继续执行。一个线程可以通过尝试加锁来获得互斥量,如果互斥量当前处于未锁定状态,则该线程可以获得锁并进入临界区;如果互斥量已经被锁定,则该线程会被阻塞等待互斥量的释放。当线程进入临界区时,互斥量被锁定;当线程离开临界区时,互斥量被释放。

基本用法如下:

1.定义互斥量变量

1
pthread_mutex_t mutex;

2.初始化互斥量

1
pthread_mutex_init(&mutex, NULL);

3.对临界区加锁

1
2
3
pthread_mutex_lock(&mutex);
// 访问共享资源
pthread_mutex_unlock(&mutex);

4.销毁互斥量

1
pthread_mutex_destroy(&mutex);

互斥量的另一个重要特性是递归锁,也叫做可重入锁。当一个线程已经获得了互斥量的锁,并在临界区内执行时,如果再次尝试获得该互斥量的锁,则不会被阻塞,而是继续执行。这种情况下,互斥量被称为递归锁,因为同一个线程可以多次获得该锁。

互斥量的初始化

动态初始化和静态初始化放在不同环境中使用,如果是凭空定义出来的变量,用静态初始化更简单些,直接使用默认属性就行,如果当前互斥量是位于结构体中的,用动态方式初始化。

1
2
3
4
5
6
#include <pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 静态初始化
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr); // 动态初始化
int pthread_mutex_destroy(pthread_mutex_t *mutex);

对临界区加锁

把同一时刻只能一个线程来访问的那部分共享资源称为临界区。在进入临界区前lock,退出临界区后unlock,互斥量锁住的是一段代码,而非仅仅指一个变量、函数之类的。

1
2
3
4
5
#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

注:pthread_mutex_lock()是阻塞的,pthread_mutex_trylock()是非阻塞的。

举个栗子

前面的add.c程序,如果是20个线程同时只读是ok的,但是如果这段代码同时运行着20份,某个线程在执行读操作时,别的线程在执行写操作,那就可能出错。

上述add.c程序加入互斥量机制后的结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// add_mutex.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>

#define THRNUM 20
#define BUFSIZE 1024
#define FNAME "/tmp/out"

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *thr_func(void *arg)
{
FILE *fp;
char buf[BUFSIZE];

pthread_mutex_lock(&mutex);
fp = fopen(FNAME, "r+");
if (fp == NULL)
{
perror("fopen()");
exit(1);
}
fgets(buf, BUFSIZE, fp);
fseek(fp, 0, SEEK_SET);
// printf(" %d\n", atoi(buf) + 1);
fprintf(fp, "%d\n", atoi(buf) + 1);
fclose(fp);
pthread_mutex_unlock(&mutex);

pthread_exit(0);
}

int main(int argc, char **argv)
{
pthread_t tid[THRNUM];
int i, err;

for (i = 0; i < THRNUM; i++)
{
err = pthread_create(tid + i, NULL, thr_func, NULL);
if (err)
{
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}
}

for (i = 0; i < THRNUM; i++)
{
pthread_join(tid[i], NULL);
}

pthread_mutex_destroy(&mutex);
exit(0);
}

编译运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ echo 0 > /tmp/out
$ cat /tmp/out
0
$ make add_mutex
cc -pthread -pthread add_mutex.c -o add_mutex
$ ./add_mutex
$ cat /tmp/out
20
$ ./add_mutex
$ cat /tmp/out
40
$ ./add_mutex
$ cat /tmp/out
60

条件变量

在线程同步中,条件变量是一种用于线程间通信的机制。它是一种同步原语,用于在一个或多个线程等待某个条件的发生时,使其进入阻塞状态,等待其他线程发出条件变量的信号,使其解除阻塞。

条件变量通常与互斥锁结合使用,以实现线程间的同步和互斥。通常,一个或多个线程在等待某个条件的发生时,会先获得互斥锁,然后通过条件变量进入阻塞状态。在其他线程满足条件时,会发送信号给条件变量,然后解除阻塞状态,继续执行。

条件变量本身不是一个锁,它只是一个等待队列,用于线程之间的通信。线程可以在条件变量上等待某个条件的发生,当条件满足时,其他线程可以通过条件变量来通知等待的线程,从而使得等待的线程重新开始执行。

在使用条件变量时,通常需要注意以下几点:

  • 需要先获得互斥锁,然后再使用条件变量。这可以避免多个线程同时访问共享变量,造成数据不一致的问题。
  • 等待条件时,需要在 while 循环中检查条件是否满足,以避免虚假唤醒的问题。
  • 发送信号时,可以使用 signal 或 broadcast 两种方式。signal 只会唤醒一个等待该条件的线程,而 broadcast 则会唤醒所有等待该条件的线程。

通常,使用条件变量需要以下步骤:

1.定义条件变量和互斥量变量

1
2
pthread_cond_t cond;
pthread_mutex_t mutex;

2.初始化条件变量和互斥量

1
2
pthread_cond_init(&cond, NULL);
pthread_mutex_init(&mutex, NULL);

3.在等待条件时使用条件变量和互斥量

1
2
3
4
5
6
pthread_mutex_lock(&mutex);
while (!condition) {
pthread_cond_wait(&cond, &mutex);
}
// 条件已满足,执行相应操作
pthread_mutex_unlock(&mutex);

4.在条件满足时使用条件变量和互斥量发送信号

1
2
3
4
pthread_mutex_lock(&mutex);
condition = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

5.销毁条件变量和互斥量

1
2
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);

需要注意的是,在使用条件变量时,需要避免竞争条件的问题,并且要确保发送信号的线程和等待条件的线程使用的是同一个互斥量。否则,可能会导致死锁或其他错误。

1
2
3
4
5
6
7
8
9
10
11
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
// ...
pthread_mutex_lock(&mutex);
while (condition_not_satisfied) {
pthread_cond_wait(&cond, &mutex);
}
// ...
pthread_mutex_unlock(&mutex);

条件变量的初始化

NAME

pthread_cond_destroy, pthread_cond_init — destroy and initialize condition variables

SYNOPSIS

#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_init(pthread_cond_t *restrict cond,

const pthread_condattr_t *restrict attr);

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

RETURN VALUE

If successful, the pthread_cond_destroy() and pthread_cond_init() functions shall return zero; otherwise, an error number shall be returned to indicate the error.

等待条件

NAME

pthread_cond_timedwait, pthread_cond_wait — wait on a condition

SYNOPSIS

#include <pthread.h>

int pthread_cond_timedwait(pthread_cond_t *restrict cond,

pthread_mutex_t *restrict mutex,

const struct timespec *restrict abstime);

int pthread_cond_wait(pthread_cond_t *restrict cond,

pthread_mutex_t *restrict mutex);

RETURN VALUE

Except for [ETIMEDOUT], [ENOTRECOVERABLE], and [EOWNERDEAD], all these error checks shall act as if they were performed immediately at the beginning of processing for the function and shall cause an error return, in effect, prior to modifying the state of the mutex specified by mutex or the condition variable specified by cond.

Upon successful completion, a value of zero shall be returned; otherwise, an error number shall be returned to indicate the error.

pthread_cond_wait:让当前线程等待在条件变量 cond 上。该函数需要传递两个参数:一个 pthread_cond_t 类型的指针,表示需要等待的条件变量;一个 pthread_mutex_t 类型的指针,表示需要使用的互斥锁。该函数会自动释放互斥锁,并将当前线程置于等待条件的队列中,直到收到信号或被中断为止。【解锁等待

如果多个线程等待同一个条件变量,当条件变量被 pthread_cond_signal() 唤醒时,只有一个线程会被唤醒;当条件变量被 pthread_cond_broadcast() 唤醒时,所有等待的线程都会被唤醒。此函数返回 0 表示成功,其他值表示出现错误。

发送信号

在多线程编程中,线程之间可能需要同步,例如等待某个条件变量(cond)满足才能继续执行。当一个线程发现条件满足时,它可以通过调用 pthread_cond_signal() 或者 pthread_cond_broadcast() ,向正在等待该条件变量的其他线程发送信号。

NAME

pthread_cond_broadcast, pthread_cond_signal — broadcast or signal a condition

SYNOPSIS

#include <pthread.h>

int pthread_cond_broadcast(pthread_cond_t *cond);

int pthread_cond_signal(pthread_cond_t *cond);

DESCRIPTION

These functions shall unblock threads blocked on a condition variable.

The pthread_cond_broadcast() function shall unblock all threads currently blocked on the specified condition variable cond.

The pthread_cond_signal() function shall unblock at least one of the threads that are blocked on the specified condition variable cond (if any threads are blocked on cond).

If more than one thread is blocked on a condition variable, the scheduling policy shall determine the order in which threads are unblocked. When each thread unblocked as a result of a pthread_cond_broadcast() or pthread_cond_signal() returns from its call to pthread_cond_wait() or pthread_cond_timedwait(), the thread shall own the mutex with which it called pthread_cond_wait() or pthread_cond_timedwait(). The thread(s) that are unblocked shall contend for the mutex according to the scheduling policy (if applicable), and as if each had called pthread_mutex_lock().

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

The pthread_cond_broadcast() and pthread_cond_signal() functions shall have no effect if there are no threads currently blocked on cond.

The behavior is undefined if the value specified by the cond argument to pthread_cond_broadcast() or pthread_cond_signal() does not refer to an initialized condition variable.

RETURN VALUE

If successful, the pthread_cond_broadcast() and pthread_cond_signal() functions shall return zero; otherwise, an error number shall be returned to indicate the error.

pthread_cond_signal()是唤醒一个等待,pthread_cond_broadcast()唤醒多个等待。

举个栗子

在一个程序中创建四个线程,使之分别向终端输出a,b,c,d,要求能连续的输出abcd字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// abcd.c

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>

#define THRNUM 4

static int num = 0;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

static int next(int n)
{
if (n + 1 == THRNUM)
return 0;
return n + 1;
}

static void *thread_func(void *p)
{
int n = (intptr_t)p;
int ch = 'a' + n;

while (1)
{
pthread_mutex_lock(&mut);
while (num != n)
pthread_cond_wait(&cond, &mut);
write(1, &ch, 1);
num = next(num);
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mut);
}

pthread_exit(NULL);
}

int main()
{
int i, err;
pthread_t tid[THRNUM];

for (i = 0; i < THRNUM; i++)
{
err = pthread_create(tid + i, NULL, thread_func, (void *)(intptr_t)i);
if (err)
{
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}
}

alarm(2);
for (i = 0; i < THRNUM; i++)
{
pthread_join(tid[i], NULL);
}

pthread_mutex_destroy(&mut);
pthread_cond_destroy(&cond);

exit(0);
}

信号量

当资源在一定范围内可以共享时,互斥量就不太好用了。互斥量是以独占的方式来使用。

信号量(Semaphore)是一个同步原语,用于控制多线程或多进程对共享资源的访问。信号量是一个整数值,通常表示共享资源的可用数量。信号量提供了两个基本操作:等待(wait,又称为P操作)和释放(post,又称为V操作)。

  1. 等待(wait,P操作): 当一个线程或进程执行等待操作时,信号量的值减一。如果信号量的值在操作之后为负数,执行等待操作的线程将被阻塞,直到信号量的值变为非负数。
  2. 释放(post,V操作): 当一个线程或进程执行释放操作时,信号量的值加一。如果有其他线程或进程在等待信号量,其中一个被阻塞的线程将被唤醒,并允许继续执行。

信号量可以用于实现多种同步和协调机制,例如:

  1. 互斥访问共享资源: 当信号量的初始值设置为1时,它可以用于实现互斥(mutex)。在这种情况下,执行等待操作的线程将获得对共享资源的独占访问,而其他尝试访问该资源的线程将被阻塞,直到释放操作执行。
  2. 同步线程或进程: 当信号量的初始值设置为0时,它可以用于同步线程或进程。例如,一个线程可以在完成某个任务后执行释放操作,而另一个线程可以执行等待操作,等待任务完成。这样可以确保第二个线程在第一个线程完成任务之后才开始执行。
  3. 限制对共享资源的并发访问: 当信号量的初始值设置为大于1的整数时,它可以用于限制对共享资源的并发访问。这种情况下,信号量的值表示可以同时访问共享资源的线程或进程的最大数量。

信号量在不同的系统和编程语言中有不同的实现。例如,在Unix系统中,信号量可以通过System V IPC或POSIX IPC实现。在C语言中,可以使用POSIX线程库(pthread)提供的信号量函数,如sem_initsem_waitsem_postsem_destroy

注:下面函数的参数 sem 都是指向已初始化的 sem_t 类型变量的指针

1.int sem_init(sem_t *sem, int pshared, unsigned int value); 函数 sem_init 用于初始化一个信号量。sem 用于存储初始化后的信号量;参数 pshared 决定信号量是用于线程间同步(设置为0)还是进程间同步(设置为非0);参数 value 是信号量的初始值。

成功时,函数返回 0;失败时返回 -1,并设置 errno。

2.int sem_destroy(sem_t *sem); 函数 sem_destroy 用于销毁一个信号量,释放其相关资源。

成功时,函数返回 0;失败时返回 -1,并设置 errno。

3.int sem_wait(sem_t *sem); 函数 sem_wait 用于以原子操作的方式减少信号量的值。如果信号量的值大于 0,它将减少信号量的值并立即返回。如果信号量的值为 0,则调用线程将阻塞,直到信号量的值变为正数。

成功时,函数返回 0;失败时返回 -1,并设置 errno。

4.int sem_post(sem_t *sem); 函数 sem_post 用于以原子操作的方式增加信号量的值。如果有其他线程因为 sem_wait 被阻塞在该信号量上,它们将被唤醒。

成功时,函数返回 0;失败时返回 -1,并设置 errno。

sem_init(3)

NAME

sem_init - initialize an unnamed semaphore

SYNOPSIS

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

Link with -pthread.

DESCRIPTION

sem_init() initializes the unnamed semaphore at the address pointed to by sem. The value argument specifies the initial value for the semaphore.

The pshared argument indicates whether this semaphore is to be shared between the threads of a process, or between processes.

If pshared has the value 0, then the semaphore is shared between the threads of a process, and should be located at some address that is visible to all threads (e.g., a global variable, or a variable allocated dynamically on the heap).

If pshared is nonzero, then the semaphore is shared between processes, and should be located in a region of shared memory (see shm_open(3), mmap(2), and shmget(2)). (Since a child created by fork(2) inherits its parent’s memory mappings, it can also access the semaphore.) Any process that can access the shared memory region can operate on the semaphore using sem_post(3), sem_wait(3), and so on.

Initializing a semaphore that has already been initialized results in undefined behavior.

RETURN VALUE

sem_init() returns 0 on success; on error, -1 is returned, and errno is set to indicate the error.

sem_destroy(3)

NAME

sem_destroy - destroy an unnamed semaphore

SYNOPSIS

#include <semaphore.h>

int sem_destroy(sem_t *sem);

Link with -pthread.

DESCRIPTION

sem_destroy() destroys the unnamed semaphore at the address pointed to by sem.

Only a semaphore that has been initialized by sem_init(3) should be destroyed using sem_destroy().

Destroying a semaphore that other processes or threads are currently blocked on (in sem_wait(3)) produces undefined behavior.

Using a semaphore that has been destroyed produces undefined results, until the semaphore has been reinitialized using sem_init(3).

RETURN VALUE

sem_destroy() returns 0 on success; on error, -1 is returned, and errno is set to indicate the error.

ERRORS

EINVAL sem is not a valid semaphore.

sem_wait(3)

NAME

sem_wait, sem_timedwait, sem_trywait - lock a semaphore

SYNOPSIS

#include <semaphore.h>

int sem_wait(sem_t *sem);

int sem_trywait(sem_t *sem);

int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

Link with -pthread.

DESCRIPTION

sem_wait() decrements (locks) the semaphore pointed to by sem. If the semaphore’s value is greater than zero, then the decrement proceeds, and the function returns, immediately. If the semaphore currently has the value zero, then the call blocks until either it becomes possible to perform the decrement (i.e., the semaphore value rises above zero), or a signal handler interrupts the call.

sem_trywait() is the same as sem_wait(), except that if the decrement cannot be immediately performed, then call returns an error (errno set to EAGAIN) instead of blocking.

sem_timedwait() is the same as sem_wait(), except that abs_timeout specifies a limit on the amount of time that the call should block if the decrement cannot be immediately performed. The abs_timeout argument points to a structure that specifies an absolute timeout in seconds and nanoseconds since the Epoch, 1970-01-01 00:00:00 +0000 (UTC). This structure is defined as follows:

struct timespec {

time_t tv_sec; /* Seconds */

long tv_nsec; /* Nanoseconds [0 .. 999999999] */

};

If the timeout has already expired by the time of the call, and the semaphore could not be locked immediately, then sem_timedwait() fails with a timeout error (errno set to ETIMEDOUT).

If the operation can be performed immediately, then sem_timedwait() never fails with a timeout error, regardless of the value of abs_timeout. Furthermore, the validity of abs_timeout is not checked in this case.

RETURN VALUE

All of these functions return 0 on success; on error, the value of the semaphore is left unchanged, -1 is returned, and errno is set to indicate the error.

sem_post(3)

NAME

sem_post - unlock a semaphore

SYNOPSIS

#include <semaphore.h>

int sem_post(sem_t *sem);

Link with -pthread.

DESCRIPTION

sem_post() increments (unlocks) the semaphore pointed to by sem. If the semaphore’s value consequently becomes greater than zero, then another process or thread blocked in a sem_wait(3) call will be woken up and proceed to lock the semaphore. 释放信号量,将信号量的值加 1.

RETURN VALUE

sem_post() returns 0 on success; on error, the value of the semaphore is left unchanged, -1 is returned, and errno is set to indicate the error.

举个栗子【互斥量+条件变量 -> 单个信号量】

使用互斥量+条件变量完成一个可以记次数(有资源上限)的资源共享。使用信号量机制重构之前的计算质数的程序(primer.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// mysem.h

#ifndef MYSEM_H__
#define MYSEM_H__

typedef void mysem_t;

mysem_t *mysem_init(int initval);

int mysem_add(mysem_t *, int );

int mysem_sub(mysem_t *, int );

int mysem_destroy(mysem_t *);

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// mysem.c

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "mysem.h"

// 定义信号量结构体
struct mysem_st
{
int value;
pthread_mutex_t mut;
pthread_cond_t cond;
};

// 定义信号量初始值
mysem_t *mysem_init(int initval)
{
struct mysem_st *me;

me = malloc(sizeof(*me));
if (me == NULL)
return NULL;
me->value = initval;
pthread_mutex_init(&me->mut, NULL);
pthread_cond_init(&me->cond, NULL);

return me;
}


int mysem_add(mysem_t *ptr, int n)
{
struct mysem_st *me = ptr;

pthread_mutex_lock(&me->mut);
me->value += n;
pthread_cond_broadcast(&me->cond);
pthread_mutex_unlock(&me->mut);

return n;
}

// 应该先判断n的值大于0还是小于0;
int mysem_sub(mysem_t *ptr, int n)
{
struct mysem_st *me = ptr;

pthread_mutex_lock(&me->mut);
while (me->value < n)
pthread_cond_wait(&me->cond, &me->mut);
me->value -= n;
pthread_mutex_unlock(&me->mut);

return n;
}

int mysem_destroy(mysem_t *ptr)
{
struct mysem_st *me = ptr;

pthread_mutex_destroy(&me->mut);
pthread_cond_destroy(&me->cond);
free(me); // free(ptr);

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// main.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include "mysem.h"

#define LEFT 30000000
#define RIGHT 30000200
#define THRNUM (RIGHT-LEFT+1)
#define N 4 // 线程个数(信号量的初始值)

static mysem_t *sem;

static void *thread_primer(void *p);

int main(int argc,char **argv)
{
int i,err;
pthread_t tid[THRNUM];

sem = mysem_init(N);
if(sem == NULL)
{
fprintf(stderr, "mysem_init() failed!\n");
exit(1);
}

for(i = LEFT; i <= RIGHT; i++)
{
mysem_sub(sem, 1);
err = pthread_create(tid+(i-LEFT),NULL,thread_primer,(void *)i);
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
exit(1);
}
}
for(i = LEFT; i <= RIGHT; i++)
{
pthread_join(tid[i-LEFT],NULL);
}

mysem_destroy(sem);
exit(0);
}

static void *thread_primer(void *p)
{
int i,j,mark;
i = (int)p;

mark = 1;
for(j = 2; j <= i/2; j++)
{
if(i % j == 0)
{
mark = 0;
break;
}
}
if(mark)
printf("%d is a primer.\n",i);

sleep(5);
mysem_add(sem, 1);
pthread_exit(NULL);
}
1
2
3
4
5
6
7
8
9
10
11
12
// makefile

CFLAGS+=-pthread
LDFLAGS+=-pthread

all:mysem

mysem:main.o mysem.o
gcc $^ -o $@ $(CFLAGS) $(LDFLAGS)

clean:
rm -rf *.o mysem

运行./mysem程序后,通过ps ax -L指令可看到类似于如下内容:

1
2
3
4
5
6
PID     LWP TTY      STAT   TIME COMMAND
5021 5021 pts/4 Sl+ 0:00 ./mysem
5021 5148 pts/4 Sl+ 0:00 ./mysem
5021 5149 pts/4 Sl+ 0:00 ./mysem
5021 5150 pts/4 Sl+ 0:00 ./mysem
5021 5151 pts/4 Sl+ 0:00 ./mysem

反复执行,5021进程一直在,后面的4个线程会变。

后面介绍进程间通信时,会介绍信号量数组,相当于这个模型的放大,如果把信号量存在数组或指针当中,就成了mytb或anytimer的一个实现套路。封装的mytb库是解决问题常用的一个框架。

补充:在vim中使用快捷键进行字符串替换操作,将全文中的mytbf字符串都替换为mysem。

1
2
3
4
5
6
7
8
9
10
CFLAGS+=-pthread
LDFLAGS+=-pthread

all:mytbf

mytbf:main.o mytbf.o
gcc $^ -o $@ $(CFLAGS) $(LDFLAGS)

clean:
rm -rf *.o mytbf

:%s/mytbf/mysem/g 将每一行中的所有mytbf替换为mysem

:%s/mytbf/mysem 将每一行中的第一个mytbf替换为mysem

:s/mytbf/mysem/g 将当前行中的所有mytbf替换为mysem

互斥量 VS 条件变量 VS 信号量

互斥量(Mutex)、条件变量(Condition Variable)和信号量(Semaphore)是用于同步和协调多线程或多进程之间的执行的常见工具。它们之间的主要区别如下:

  1. 互斥量(Mutex): 互斥量主要用于保护共享资源的访问,确保同一时刻只有一个线程或进程可以访问共享资源。互斥量的基本操作包括加锁(lock)和解锁(unlock)。当一个线程获取互斥量时,其他线程必须等待直到互斥量被释放。互斥量通常用于实现临界区,即一段只能被一个线程执行的代码。
  2. 条件变量(Condition Variable): 条件变量用于线程间的同步,它允许一个或多个线程等待满足某个条件。条件变量通常与互斥量一起使用,当一个线程需要等待某个条件时,它会解锁互斥量并阻塞在条件变量上,当条件满足时,另一个线程可以使用条件变量通知等待的线程,此时,被唤醒的线程会重新获取互斥量并继续执行。条件变量的主要操作有:等待(wait)、通知一个(signal)和通知所有(broadcast)。
  3. 信号量(Semaphore): 信号量是一个计数器,用于控制对共享资源的访问。信号量有两个基本操作:等待(wait,又称为P操作)和释放(post,又称为V操作)。当一个线程执行等待操作时,信号量的值减一,如果此时信号量的值小于零,线程会阻塞。当一个线程执行释放操作时,信号量的值加一,如果有其他线程在等待该信号量,其中一个线程将被唤醒。信号量可以用于实现互斥(当信号量的初始值为1时)和同步(当信号量的初始值为0时)。

总结:

  • 互斥量主要用于保护共享资源,确保同一时刻只有一个线程可以访问该资源。
  • 条件变量用于线程间同步,允许线程等待某个条件满足。
  • 信号量用于控制对共享资源的访问,可实现互斥和同步。

互斥量和条件变量通常用于多线程编程,而信号量可以用于多线程和多进程编程。在某些系统中,信号量实现可能基于互斥量和条件变量。