互斥和同步-消费者/生产者 (管程+消息传递)

管程

  • 由一个或多个过程、一个初始化序列和局部数据组成的软件模块,主要特点:

    >1. 局部数据变量只能被**管程的过程**访问,任何外部过程都不能访问
      >2. 一个进程通过调用**管程的一个过程**进入管程.
      >3. 只能有一个进程在管程中执行,调用管程的任何其他进程都被**阻塞**,以等待管程可用
  • 管程的互斥机制:

    管程的数据每次只能被一个进程访问

    通过放入 共享数据结构 从而实现访问该资源的进程的互斥

  • 管程的同步工具:

    目的: 当调用管程的进程被阻塞时 需要释放该管程供其他进程进入,当条件满足且管程可用则 进程从阻塞点重新进入管程.

    解决方法: 使用条件变量支持同步, 提供2个函数来操作条件变量

    1. cwait(c) : 进程被条件c阻塞,管程可被其他进程调用
    2. csignal(c) : 恢复因条件c而被阻塞的进程, 若有多个这样的进程,且选择其中的一个
  • 管程的结构

    1. 入口: 等待进入管程的进程队列
    2. 管程等待区域: 条件等待队列 和 紧急队列
    3. 管程的数据区域: 局部数据 条件变量 过程(1,2,3,4….) 初始化代码
  • 管程的工作流程

    1. 入口队列保证一次只能有一个进程可以进入, 试图进入管程的进程被阻塞加入等待管程可用的进程队列
    2. 当使用管程的进程执行cwait(c)把自己阻塞在条件c上,则加入等待条件改变重进进入管程的队列 (队列在管程内部)

管程解决有界缓冲区 生产者\消费者 问题

  • 管程的2个条件变量:

    1. notfull :缓冲区至少有增加一个字符的空间时,notfull为真
    2. notempty : 缓冲区至少还有一个字符时, notemprt为真
伪代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/* program producerconsume */
moniter boundedbuffer;
char buffer[N]; //缓冲区N个数据项
int nextin,nextout; // 缓冲区指针
int count; // 缓冲区数据项个数
cond notfull,notempty; // 条件变量

// 初始化缓冲区
{
nextin = 0;
nextout = 0;
count = 0;
}

void append(char x) {
if (count == N) // 缓冲区满,防止溢出
cwait(notfull);
buffer[nextin] = x;
nextin = (nextin + 1) % N;
count++;
csignal(notempty); // 缓冲区添加数据后 可以供消费者使用
}
void take(char x) {
if (count == 0) //缓冲区为空 防止下溢
cwait(notempty);
x = buffer[nextout];
nextout = (nextout + 1) % N;
count--;
csignal(notfull); // 生产者可以继续生产
}


// main
void producer() {
char x;
while (true) {
produce(x);
append(x);
}
}
void consume() {
char x;
while (true) {
take(x);
consume(x);
}
}
void main() {
parbegin(producer,consumer);
}

  • 优点:
    1. 管程有自己的互斥机制,通过控制条件变量完成进程的同步就行
    2. 使用信号量的话 互斥 和 同步 都要自己实现

消息传递

  • 实际功能由原语提供: 原语:进程间消息传递的最小操作集

    >send(destination,message);
      >
      >receive(source,message);
  • 进程执行send原语2种可能
    1. 不被阻塞
    2. 发送的进程被阻塞 知道 消息被目标进程所接受
  • 进程执行receive原语后的可能
    1. 直接受到消息 进程继续执行
    2. 等待消息:
      • 该进程被阻塞直到消息到达
      • 进程继续执行,放弃消息接受

三种组合

  1. 阻塞send,阻塞receive:都被阻塞,直到消息完成传递,使得进程紧密同步
  2. 无阻塞send,阻塞receive: 发送进程继续执行,而接受进程被阻塞直到请求消息到达
  3. 无阻塞send,无阻塞receive

直接寻址

  • 上面都是直接寻址通过senddestinationreceivesource参数 例如:目标进程标识号

间接寻址

  • 消息不从发出者直接到达接受者, 而是发送到一个共享的数据结构,由临时保存消息的队列组成,通常称为信箱
  • 常见关系:
    • 1对 1 :专用通信链接
    • 多对1 :常见客户-服务器交互. 服务器由一个进程给其他许多进程提供服务(消息服务?)**,此时信箱常被称为端口**
    • 1 对 多 : 某进程广播一条消息
    • 多 对 多:

利用 消息传递 解决有限缓冲区 生产者\消费者 问题

  • 思路

    1. 通过 消息传递的间接寻址, 除了传递信号外还传递数据

    2. 生产者将产生的数据作为消息发到信箱mayconsume,只要该信箱还有一条消息,消费者就可以开始消费

    3. 那么mayconsume就作为缓冲区,全局变量capacity确定缓冲区大小,

    4. 信箱mayproduce开始时填满空消息,且等于信箱的容量,每次生产使得mayproduce中的消息减少

      mayconsume中的消息增多

  • 通过这2个信箱(消息队列),可以有多个生产者和消费者,

伪代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const int capacity = /*缓冲区容量*/;
message null = "空消息";
int i;
void producer() {
message pmsg;
while(true) {
receive (mayproduce,pmsg);
pmsg = produce();
send (mayconsume,pmsg);
}
}
void consumer() {
message cmsg;
while (true) {
receive (mayconsume,cmsg);
consume(cmsg);
send (mayproduce,null);
}
}
void main() {
create_mailbox(mayproduce);
create_mailbox(mayconsume);
for (int i=1; i<=capacity; ++i)
send (mayproduce,null); // mayproduce插入空消息
parbegin(producer,consumer);
}