互斥和同步-生产者/消费者问题

信号量解决互斥

  • 由于所有进程都需要访问共享资源,每个进程进入临界区前执行 semWait(s),若s为负,则进程被阻塞,为正数则-1,进程立即进入临界区,由于s不为正,则其他任何进程不能进入临界区

  • 下面将 缓冲区作为 共享资源

生产者/消费者问题

  • 问题:

    有一个或多个生产者生产某种类型的数据,放置在缓冲区,,有一个消费者从缓冲区取数据,每次取一项,

    操作系统为了保证避免对缓冲区的重复操作,即在任何时候只有一个主体(生产者或消费者)访问缓冲区

    边界条件:

    1. 缓存区满时,生产者不会继续向其中添加数据,
    2. 缓存区为空时,消费者不会从中移走数据

二元信号量解决无限缓冲区的方法(直观但不正确)

缓冲区结构:

先假设缓冲区是一个无限长的线性元素数组

  1. 生产者生产后缓冲区索引(in)增加 1,消费者类似
  2. 为了避免从空的缓冲区读取数据,消费者必须确保已经生产(in > out)
生产者函数(producer) 和 消费者函数(consumer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void producer() {
while (true) {
/* 生产 v (是代码 并非注释 下同)*/;
b[in] = v;
in++;
}
}

void consumer() {
while(true) {
while (in <= out)
/* 数据不足,不做任何事 */;
w = b[out];
out++;
/* 消费w */;
}
}
二元信号量实现
  1. N记录缓冲区数据项的个数
  2. 信号量S实施互斥
  3. 信号量delay用于迫使消费者在缓冲区为空时等待(semWait)
程序伪代码
  • parbegin(p1,p2,p3,p4……):阻塞主程序,初始化并行过程p1,p2,p3,….,所有过程终止后主程序恢复执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* program producerconsumer */
int n;
binary_semaphore s=1,delay=0; // 二元信号量
void producer() {
while (true) {
produce();
semWaitB(s);
append(); // 添加到 缓冲区
n++;
if (n == 1) // 停止消费者在缓冲区的空时等待
semSignalB(delay);
semSignalB(s);
}
}

void consumer() {
semWaitB(delay);
while (true) {
semWaitB(s);
take(); //从缓冲区得到数据
n--;
semSignalB(s);
consume(); // 这里就有问题了:生产者得到信号量立即执行,而消费者还没消费
if (n == 0) // 缓冲区为空 消费者必须空时等待(阻塞态)
semWaitB(delay);
}
}
void main() {
n = 0;
parbegin(producer,consumer);
}
分析:
  • 直观的看到
  1. 生产者任何时候都可以向缓冲区增加数据项,且通过semWaitB(s) / semSignalB(s)阻止消费者和其他生产者访问缓冲区
  2. 生产者在临界区时(生产者在执行),若n==1执行semSignalB(delay)通知消费者停止空时等待,消费者通过semWaitB(delay)等待数据产生
  3. 若 生产者总在消费者之前执行, 那么 消费者很少会被阻塞在信号量delay
缺陷
  • n=0时,第一个生产者(producer)执行后.消费者consumer()开始执行时。
  1. 观察consumer()函数,发现在判断缓冲区是否为空之前就执行了semSignalB(s),那么此时生产者立刻进入临界区,即生产者执行semWaitB(s)开始生产,而此时消费者仍未判断缓冲区是否为空
  2. 生产者又生产一个数据n++后,此时n=1delay = 1,而此时消费者才开始判断if(n == 0)缓冲区大小但此时n已经等于1了,那么if(n==0) semWaitB(delay)跳过,
  3. 跳过后假设生产者未抢占信号量,那么消费者while(true)继续执行下去,又执行了一轮semWaitB(s) / semSignalB(s),此时n=0,if()判断后delay=0,此时消费者继续执行(未被生产者抢占临界区),然后n=-1,if(n==0)判断失效,陷入死锁
总结
  • 问题出在消费者的semWaitB(delay)提前了,导致if(n==0)判断不在临界区中执行,并产生死锁

二元信号量中引入辅助变量解决死锁

原因
  • 消费者的semSignalB(s)执行完后 -> 生产者抢占临界区执行后改变了全局变量n(即消费者未执行完就破破坏了它的数据),开一个临时变量保存消费者的n状态能解决死锁
  • 消费者semSignalB(s)放在while()最后 为什么不能解决死锁?
    • n == 0semWaitB(delay),消费者被delay阻塞 且 不能执行semSignalB(s)释放信号量s
    • 也造成死锁
伪代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*program producerconsumer*/
int n;
binary semaphore s = 1,delay = 0;
void producer() {
while(true) {
produce();
semWaitB(s);
append();
n++;
if (n == 1)
semSignalB(delay);
semSignalB(s);
}
}
void consumer() {
int m;
semWaitB(delay);
while(true) {
semWaitB(s);
take();
n--;
m = n;
semSignalB(s);
consume();
if (m == 0) semWaitB(delay);
}
}
void main() {
n = 0;
parbegin(producer,consumer);
}

一般信号量 解决方法

改进
  • 变量n作为信号量,其值等于缓冲区中的项数
伪代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*program producerconsumer */
semaphore n = 0, s = 1;
void producer() {
while(true) {
produce();
semWait(s);
append();
semSignal(s);
semSignal(n);
}
}
void consumer() {
while (true) {
semWait(n);
semWait(s);
take();
semSignal(s);
consume();
}
}
void main() {
parbegin(producer,consumer);
}
分析
  • producer中增加2个信号量n s 以及 consumer中消耗2个信号量ns

  • 即使 producer执行完semSignal(s)后 -> consumer立即进入临界区,由于需要等待2个信号量,所以consumer还是执行不了

假设semWait(s)semWait(n)互换
  • 如果缓冲区为0,生产者执行完semSignal(s)后还未semSignal(n) ,此时消费者立刻执行semWait(s)进入临界区,那么生产者无法产生数据且消费之一直等待信号量n造成死锁

有限缓冲区

  • 仍是线性元素数组且长度有限,通过对指针的大小取模来达到循环效果
生产者函数(producer)和消费者函数consumer表示形式

producer:

1
2
3
4
5
6
while (true) {
while ((in+1) % n == out) // 说明生产者指针要追上消费者了 即缓冲区满了,那么不能继续生产
/*啥都不做*/
b[in] = v;
in = (in + 1) % n;
}

consumet:

1
2
3
4
5
6
7
while(true) {
while(in == out) //缓冲区为空,则等待,
/*啥都不做,因为不能取*/
w = b[out];
out = (out + 1) % n;
/*消费 w */;
}
信号量解决有限缓冲区生产者/消费者问题方法
  • n:缓冲区数据项的个数,e:空余空间的数量 s:互斥条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*program boundedbuffer */
const int sizeofbuffer = /*缓冲区大小*/;
semaphore s=1,n=0,e=sizeofbuffer;
void producer() {
while(true) {
produce();
semWait(e);
semWait(s);
append();
semSignal(s);
semSignal(n);
}
}
void consumer() {
while (true) {
semWait(n);
semWait(s);
take();
semSignal(s);
semSignal(e);
consume();
}
}
void main() {
parbegin(producer,consumer);
}