Producer-consumer problem

1.进程同步与信号量

进程同步:让多个进程"走走停停"来保证多进程合作的合理有序(以生产者消费者问题为例进行讲解)

1.1生产者消费者问题

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法[1]等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

1.2实现

1.2.1不完善实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int itemCount = 0;

procedure producer() {
while (true) {
item = produceItem();
//缓存区满,生产者休眠
if (itemCount == BUFFER_SIZE) {
sleep();
}
putItemIntoBuffer(item);
itemCount = itemCount + 1;
//缓存区数量为1,唤醒消费者
if (itemCount == 1) {
wakeup(consumer);
}
}
}

procedure consumer() {
while (true) {
//缓存区空,消费者休眠
if (itemCount == 0) {
sleep();
}
item = removeItemFromBuffer();
itemCount = itemCount - 1;
//缓存区数量为BUFFER_SIZE - 1,唤醒生产者
if (itemCount == BUFFER_SIZE - 1) {
wakeup(producer);
}
consumeItem(item);
}
}

存在的问题:可能导致竞争条件,进而引发死锁,如以下情况

  1. 消费者把最后一个 itemCount 的内容读出来,注意它现在是零。消费者返回到while的起始处,现在进入 if 块;
  2. 就在调用sleep之前,CPU决定将时间让给生产者,于是消费者在执行 sleep 之前就被中断了,生产者开始执行;
  3. 生产者生产出一项数据后将其放入缓冲区,然后在 itemCount 上加 1;
  4. 由于缓冲区在上一步加 1 之前为空,生产者尝试唤醒消费者;
  5. 遗憾的是,消费者并没有在休眠,唤醒指令不起作用。当消费者恢复执行的时候,执行 sleep,一觉不醒。出现这种情况的原因在于,消费者只能被生产者在 itemCount 为 1 的情况下唤醒;
  6. 生产者不停地循环执行,直到缓冲区满,随后进入休眠。

如果存在多个生产者,也是不行的,itemCount含义不足以表达休眠中的生产者。会存在有些生产者一直休眠

image-20210919131635850

1.2.2采取信号灯实现

信号灯可以避免上述唤醒指令不起作用的情况。该方法使用了两个信号灯,fillCount 和 emptyCount。fillCount 用于记录缓冲区中将被读取的数据项数(实际上就是有多少数据项在缓冲区里),emptyCount 用于记录缓冲区中空闲空间数。当有新数据项被放入缓冲区时,fillCount 增加,emptyCount 减少。如果在生产者尝试减少 emptyCount 的时候发现其值为零,那么生产者就进入休眠。等到有数据项被消耗,emptyCount 增加的时候,生产者才被唤醒。消费者的行为类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
semaphore fillCount = 0; // 生产的项目
semaphore emptyCount = BUFFER_SIZE; // 剩余空间

procedure producer() {
while (true) {
item = produceItem();
down(emptyCount);
//若emptyCount为0,生产者进入休眠
//若emotyCount为负数,表示有生产者在休眠
putItemIntoBuffer(item);
up(fillCount);
}
}

procedure consumer() {
while (true) {
down(fillCount);
item = removeItemFromBuffer();
up(emptyCount);
consumeItem(item);
}
}

上述方法在只有一个生产者和一个消费者时能解决问题。对于多个生产者或者多个消费者共享缓冲区的情况,该算法也会导致竞争条件,出现两个或以上的进程同时读或写同一个缓冲区槽的情况。

为了说明这种情况是如何发生的,可以假设 putItemIntoBuffer() 的一种可能的实现:先寻找下一个可用空槽,然后写入数据项。下列情形是可能出现的:

  1. 两个生产者都减少 emptyCount 的值;
  2. 某一生产者寻找到下一个可用空槽;
  3. 另一生产者也找到了下一个可用空槽,结果和上一步被找到的是同一个空槽;
  4. 两个生产者向可用空槽写入数据。

为了解决这个问题,需要在保证同一时刻只有一个生产者能够执行 putItemIntoBuffer()。也就是说,需要寻找一种方法来互斥地执行临界区的代码。为了达到这个目的,可引入一个二值信号灯 mutex,其值只能为 1 或者 0。如果把线程放入 down(mutex) 和 up(mutex) 之间,就可以限制只有一个线程能被执行。多生产者、消费者的解决算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
semaphore mutex = 1;
//只有mutex为1才允许进入修改共享缓冲区
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;

procedure producer() {
while (true) {
item = produceItem();
down(emptyCount);
down(mutex);
putItemIntoBuffer(item);
up(mutex);
up(fillCount);
}
}
procedure consumer() {
while (true) {
down(fillCount);
down(mutex);
item = removeItemFromBuffer();
up(mutex);
up(emptyCount);
consumeItem(item);
}
}

注意代码中 while 语句的用法,都是用在测试缓冲区是否已满或空的时候。当存在多个消费者时,有可能造成竞争条件的情况是:某一消费者在一项数据被放入缓冲区中时被唤醒,但是另一消费者已经在管程上等待了一段时间并移除了这项数据。如果 while 语句被改成 if,则会出现放入缓冲区的数据项过多,或移除空缓冲区中的元素的情况。

2.信号量临界区保护

多个生产者共同修改emptyCount,由于时间分片执行的关系,有可能使得emptyCount的值与真实情况不符合。这种错误是有多个进程并发操作共享数据引起,与调度顺序有关,难以发现和调试。

若这种情况(伪代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
emptyCount = 5;
//reg是寄存器

//生产者P1进行生产
reg = emptyCount;
reg = reg -1;
emptyCount = reg;

//生产者P2进行生产
reg = emptyCount;
reg = reg -1;
emptyCount = reg;

//实际有可能的调度情况
p1.reg = emptyCount; //5
p1.reg = p1.reg -1; //4
p2.reg = emptyCount; //5
p2.reg = p2.reg -1; //4
emptyCount = p1.reg; //4
emptyCount = p2.reg; //4