进程互斥实验

为了能够体验IPC机制的消息队列的用法,示例程序采用了 Theaker &Brookes 提出的消息传递算法。该算法中有一控制进程,带有3个不同类型的消息信箱,它们分别是:读请求信箱、写请求信箱和操作完成信箱。读者需要
访问临界资源时首先要向控制进程发送读请求消息,写者需要访问临界资源时也要先向控制进程发送写请求消息,在得到控制进程的允许消息后方可进入临界区读或写。读或写者在完成对临界资源的访问后还要向控制进程发送操作完成消息。

控制进程使用一个变量count控制读写者互斥的访问临界资源并允许写者优先count的初值需要一个比最大读者数还要大的数,本例取值为100。当count 大于0 时说明没有新的读写请求,控制进程接收读写者新的请求,如果收到读者完成消息,对count 的值加1,如果收到写者请求消息,count的值减100,如果收到读者请求消息,对count的值减1。 当count等于0时说明写者正在写,控制进程等待写者完成后再次令count 的值等于100。 当count小于0 时说明读者正在读,控制进程等待读者完成后对count 的值加1。

Linux系统调用语法

创建消息队列

1
2
#include<sys/msg.h>
int msgget(key_t key,int flags)

key 消息队列的键值,可以为IPC_PRIVATE,也可以用整数指定一个

flags 消息队列权限位。

msgget 调用成功,如果key 用新整数指定,且flags 中设置了IPC_CREAT位,则返回一个新建立的消息队列标识符。 如果指定的整数key 已存在则返回与key关联的标识符。成功返回-1。

追加一条新消息到消息队列

1
2
#include <sys.msg.h>
int msgsnd(int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg);

msqid 由消息队列的标识符

msgp 消息缓冲区指针。消息缓冲区结构为:

1
2
3
4
struct msgbuf {
long mtype; /* 消息类型,必须大于0 */
char mtext[1]; /* 消息数据,长度应于msgsz 声明的一致*/

msgsz 消息数据的长度

msgflg 为0表示阻塞方式,设置IPC_NOWAIT表示非阻塞方式

msgsnd调用成功返回0,不成功返回-1。

从到消息队列中读出一条新消息

1
2
#include <sys.msg.h>
int msgrcv(int msqid, struct msgbuf *msgp, size_t msgsz, long msgtype, int msgflg);

msqid 消息队列的标识符

msgp 消息缓冲区指针。消息缓冲区结构为:

1
2
3
4
struct msgbuf {
long mtype; /* 消息类型,必须大于0 */
char mtext[1]; /* 消息数据,长度应于msgsz 声明的一致*/

msgsz 消息数据的长度

msgtype 决定从队列中返回哪条消息:

  • =0 返回消息队列中第一条消息

  • >0 返回消息队列中等于mtype类型的第一条消息。

  • <0 返回 mtype<=type绝对值最小值的第一条消息。

msgflg 为0表示阻塞方式,设置IPC_NOWAIT表示非阻塞方式

msgrcv调用成功返回0,不成功返回-1。

删除消息队列

1
2
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

msqid 由消息队列的标识符

cmd 控制命令。常用的有:

  • IPC_RMID 删除msgid标识的消息队列
  • IPC_STAT 为非破坏性读,从队列中读出一个msgid_ds结构填充缓冲buf
  • IPC_SET 改变队列的UID,GID,访问模式和最大字节数。

msgctl调用成功返回0,不成功返回-1。

思路

首先,我们对程序要有最基本的构思:一个控制台运行理发师程序,这个程序要创建三个进程(包括最初的父进程)作为三个理发师。另一个控制台运行顾客程序,负责不断产生顾客,同时顾客要满足题目中给定的沙发与等候室的数量要求。

其次,由于使用消息队列,我们可以这样定义理发师与消费者的行为。

对于理发师,采用非阻塞的消息队列,如果沙发申请的消息队列中能收到消息,说明有顾客来理发了,理发师向沙发回应的队列中发送消息,释放一个沙发位置,相当于为该坐在沙发上的顾客理发,并互斥地完成结账。

对于顾客,逻辑就要更复杂一些。首先,我们要进行判断,如果沙发上就坐的人数少于4个,那么由于我们要保证等候室的顾客先占用沙发的位置,就需要先看等候室人数是否为0,不为0则将一个等候室中的顾客调入沙发(向等候室回应的消息队列发送消息),为0就说明店内总人数还没有将沙发坐满,优先进入沙发的位置,但无论等候室人数多少,都会向沙发申请的队列发送消息。当沙发上坐满四个人,我们就让顾客去等候室等待(向等候室申请的消息队列发送消息),再否则,阻塞掉当前生产顾客的进程,认为顾客不再进入理发店。

其次,在上述判断之外,我们还需要添加沙发回应和等候室回应两个消息队列中消息的处理方法,其实就是没接受到一个回应,就将相应位置的人数减一。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//ipc.h
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/msg.h>

#define BUFSZ 256
#define MAXVAL 100
#define STRSIZ 8
#define WRITERQUEST 1 //写请求标识
#define READERQUEST 2 //读请求标识
#define FINISHED 3 //读写完成标识

/*信号灯控制用的共同体*/
typedef union semuns {
int val;
} Sem_uns;

/* 消息结构体*/
typedef struct msgbuf {
long mtype;
int mid;
} Msg_buf;


//信号量
key_t account_key;
int account_sem;

int sem_val;
int sem_flg;

//消息队列
int wait_quest_flg;
key_t wait_quest_key;
int wait_quest_id;
int wait_respond_flg;
key_t wait_respond_key;
int wait_respond_id;

int sofa_quest_flg;
key_t sofa_quest_key;
int sofa_quest_id;
int sofa_respond_flg;
key_t sofa_respond_key;
int sofa_respond_id;

int get_ipc_id(char *proc_file,key_t key);
char *set_shm(key_t shm_key,int shm_num,int shm_flag);
int set_msq(key_t msq_key,int msq_flag);
int set_sem(key_t sem_key,int sem_val,int sem_flag);
int down(int sem_id);
int up(int sem_id);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//ipc.c
#include "ipc.h"

int get_ipc_id(char *proc_file,key_t key) {
FILE *pf;
int i,j;
char line[BUFSZ],colum[BUFSZ];
if((pf = fopen(proc_file,"r")) == NULL){
perror("Proc file not open");
exit(EXIT_FAILURE);
}
fgets(line, BUFSZ,pf);
while(!feof(pf)){
i = j = 0;
fgets(line, BUFSZ,pf);
while(line[i] == ' ') i++;
while(line[i] !=' ')
colum[j++] = line[i++];
colum[j] = '\0';
if(atoi(colum) != key)
continue;
j=0;
while(line[i] == ' ')
i++;
while(line[i] !=' ')
colum[j++] = line[i++];
colum[j] = '\0';
i = atoi(colum);
fclose(pf);
return i;
}
fclose(pf);
return -1;
}
int down(int sem_id) {
struct sembuf buf;
buf.sem_op = -1;
buf.sem_num = 0;
buf.sem_flg = SEM_UNDO;
if((semop(sem_id,&buf,1)) <0) {
perror("down error ");
exit(EXIT_FAILURE);
}
return EXIT_SUCCESS;
}

int up(int sem_id) {
struct sembuf buf;
buf.sem_op = 1;
buf.sem_num = 0;
buf.sem_flg = SEM_UNDO;
if((semop(sem_id,&buf,1)) <0) {
perror("up error ");
exit(EXIT_FAILURE);
}
return EXIT_SUCCESS;
}

int set_sem(key_t sem_key,int sem_val,int sem_flg) {
int sem_id; Sem_uns sem_arg;

//测试由 sem_key 标识的信号灯数组是否已经建立
if((sem_id = get_ipc_id("/proc/sysvipc/sem",sem_key)) < 0 ){
//semget 新建一个信号灯,其标号返回到 sem_id
if((sem_id = semget(sem_key,1,sem_flg)) < 0){
perror("semaphore create error");
exit(EXIT_FAILURE);
}
//设置信号灯的初值
sem_arg.val = sem_val;
if(semctl(sem_id,0,SETVAL,sem_arg) <0){
perror("semaphore set error");
exit(EXIT_FAILURE);
}
}
return sem_id;
}

char * set_shm(key_t shm_key,int shm_num,int shm_flg) {
int i,shm_id;
char * shm_buf;

//测试由 shm_key 标识的共享内存区是否已经建立
if((shm_id = get_ipc_id("/proc/sysvipc/shm",shm_key)) < 0 ){
//shmget 新建 一个长度为 shm_num 字节的共享内存,其标号返回到 shm_id
if((shm_id = shmget(shm_key,shm_num,shm_flg)) <0){
perror("shareMemory set error");
exit(EXIT_FAILURE);
}
//shmat 将由 shm_id 标识的共享内存附加给指针 shm_buf
if((shm_buf = (char *)shmat(shm_id,0,0)) < (char *)0){
perror("get shareMemory error");
exit(EXIT_FAILURE);
}
for(i=0; i<shm_num; i++) shm_buf[i] = 0;
//初始为 0
}
//shm_key 标识的共享内存区已经建立,将由 shm_id 标识的共享内存附加给指 针 shm_buf
if((shm_buf = (char *)shmat(shm_id,0,0)) < (char *)0){
perror("get shareMemory error");
exit(EXIT_FAILURE);
}
return shm_buf;
}

int set_msq(key_t msq_key,int msq_flg){
int msq_id;
//测试由 msq_key 标识的消息队列是否已经建立
if((msq_id = get_ipc_id("/proc/sysvipc/msg",msq_key)) < 0 ){
//msgget 新建一个消息队列,其标号返回到 msq_id
if((msq_id = msgget(msq_key,msq_flg)) < 0){
perror("messageQueue set error");
exit(EXIT_FAILURE);
}
}
return msq_id;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//customer.c
#include "ipc.h"
#include <unistd.h>
int main(int argc,char *argv[])
{
int rate;
Msg_buf msg_arg;
msg_arg.mtype = 1;
//可在在命令行第一参数指定一个进程睡眠秒数,以调解进程执行速度
if(argv[1] != NULL) rate = atoi(argv[1]);
else rate = 3;

//联系一个请求消息队列
wait_quest_flg = IPC_CREAT| 0644;
wait_quest_key = 401;
wait_quest_id = set_msq(wait_quest_key,wait_quest_flg);
//联系一个响应消息队列
wait_respond_flg = IPC_CREAT| 0644;
wait_respond_key = 402;
wait_respond_id = set_msq(wait_respond_key,wait_respond_flg);

//联系一个请求消息队列
sofa_quest_flg = IPC_CREAT| 0644;
sofa_quest_key = 501;
sofa_quest_id = set_msq(sofa_quest_key,sofa_quest_flg);
//联系一个响应消息队列
sofa_respond_flg = IPC_CREAT| 0644;
sofa_respond_key = 502;
sofa_respond_id = set_msq(sofa_respond_key,sofa_respond_flg);

int sofa_count=0;
int wait_count=0;
int i = getpid() * 100;//顾客标号
while(1) {
sleep(rate);
i++;
msg_arg.mid = i;
printf("sofa 人数 %d,waiting room 人数 %d\n",sofa_count,wait_count);
if(sofa_count < 4) {
if(wait_count != 0) {
//从等候室向沙发调人
//阻塞方式接收消息
msgrcv(wait_quest_id, &msg_arg, sizeof(msg_arg), 0, 0);
msgsnd(wait_respond_id, &msg_arg,sizeof(msg_arg), 0);
printf("customer%d in pid %d move from waiting room to sofa\n", msg_arg.mid,getpid());
} else {
printf("customer%d in pid %d sit sofa\n", i,getpid());
}

sofa_quest_flg=IPC_NOWAIT;
msgsnd(sofa_quest_id, &msg_arg, sizeof(msg_arg), sofa_quest_flg);
sofa_count++;

} else if(wait_count < 13) {
printf("sofa is full, customer%d in pid %d is waiting in the waiting room\n", i,getpid());
wait_quest_flg = IPC_NOWAIT;
msgsnd(wait_quest_id, &msg_arg, sizeof(msg_arg), wait_quest_flg);
wait_count++;

} else {
printf("waiting room is full,customer%d in pid %d can't get into barber shop\n", i,getpid());
//阻塞掉,不再产生顾客
msgrcv(sofa_respond_id, &msg_arg, sizeof(msg_arg), 0, 0);
sofa_count--;
}

sofa_quest_flg = IPC_NOWAIT;
if(msgrcv(sofa_respond_id, &msg_arg, sizeof(msg_arg), 0, sofa_quest_flg)>=0){
sofa_count--;
}
wait_quest_flg = IPC_NOWAIT;
if(msgrcv(wait_respond_id, &msg_arg, sizeof(msg_arg), 0, wait_quest_flg)>=0) {
wait_count--;
}
}

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
//barber.c
#include "ipc.h"
#include <unistd.h>
int main(int argc,char *argv[])
{
// int i;
int rate;
Msg_buf msg_arg;
msg_arg.mtype = 2;
//可在在命令行第一参数指定一个进程睡眠秒数,以调解进程执行速度
if(argv[1] != NULL) rate = atoi(argv[1]);
else rate = 3;

//联系一个请求消息队列
sofa_quest_flg = IPC_CREAT| 0644;
sofa_quest_key = 501;
sofa_quest_id = set_msq(sofa_quest_key,sofa_quest_flg);
//联系一个响应消息队列
sofa_respond_flg = IPC_CREAT| 0644;
sofa_respond_key = 502;
sofa_respond_id = set_msq(sofa_respond_key,sofa_respond_flg);

//信号量使用的变量
account_key = 602;//账簿互斥信号灯键值
sem_flg = IPC_CREAT | 0644;
//账簿互斥信号灯初值设为 1
sem_val = 1;
//获取账簿互斥信号灯,引用标识存 cons_sem
account_sem = set_sem(account_key,sem_val,sem_flg);

int pid1, pid2;
pid1=fork();
if(pid1==0) {
while(1) {
wait_quest_flg=IPC_NOWAIT;
sleep(rate);

if(msgrcv(sofa_quest_id, &msg_arg, sizeof(msg_arg), 0, wait_quest_flg)>=0) {
msgsnd(sofa_respond_id, &msg_arg,sizeof(msg_arg), 0);
printf("%d barber is serving for %d customer \n", getpid(), msg_arg.mid);

down(account_sem);
printf("%d barber is collect %d customer's money\n", getpid(), msg_arg.mid);
up(account_sem);
}else {
printf("%d barber is sleeping\n", getpid());
}
}
} else {
pid2=fork();
if(pid2==0) {
while(1) {
wait_quest_flg=IPC_NOWAIT;
sleep(rate);
if(msgrcv(sofa_quest_id, &msg_arg, sizeof(msg_arg), 0, wait_quest_flg)>=0) {
msgsnd(sofa_respond_id, &msg_arg,sizeof(msg_arg), 0);
printf("%d barber is serving for %d customer \n", getpid(), msg_arg.mid);

down(account_sem);
printf("%d barber is collect %d customer's money\n", getpid(), msg_arg.mid);
up(account_sem);
} else {
printf("%d barber is sleeping\n", getpid());

}
}
} else {
while(1) {
wait_quest_flg=IPC_NOWAIT;
sleep(rate);
if(msgrcv(sofa_quest_id, &msg_arg, sizeof(msg_arg), 0, wait_quest_flg)>=0) {
msgsnd(sofa_respond_id, &msg_arg,sizeof(msg_arg), 0);
printf("%d barber is serving for %d customer \n", getpid(), msg_arg.mid);

down(account_sem);
printf("%d barber is collect %d customer's money\n", getpid(), msg_arg.mid);
up(account_sem);
} else {
printf("%d barber is sleeping\n", getpid());

}
}
}
}
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
hdrs = ipc.h
r_src = barber.c ipc.c
r_obj = barber.o ipc.o
w_src = costumer.c ipc.c
w_obj = costumer.o ipc.o
opts = -g -c

all: costumer barber

barber: $(r_obj)
gcc $(r_obj) -o barber

barber.o: $(r_src) $(hdrs)
gcc $(opts) $(r_src)

costumer: $(w_obj)
gcc $(w_obj) -o costumer

costumer.o: $(w_src) $(hdrs)
gcc $(opts) $(w_src)

clean:
rm barber costumer *.o

遇到的一些问题与解决方案

如果发现一个消息一直无法被添加到消息队列,也就是说,msgsnd一直返回-1,可以检查msg_arg.mtype或者msg_arg.mid是否进行了赋值。

一些美中不足

顾客的进程是进程间不安全的,也就是说,没有进行进程间同步。示例实验采用的方案是将读写与控制器分离但在独立实验中这样做显得有些困难。另一种解决方案是将沙发人数与等候室人数放入共享内存,但这样的话,每个顾客生产进程就要涉及到对共享内存内容的初始化,也就是说,我们无法保证共享内存只被初始化一次。

我想到的一种方案是在共享内存中再加一个判断是否已经初始化的布尔值,来控制初始化次数,但这样的话,我们就必须引入对共享内存的三个不同值的读写的互斥锁,会导致整个代码体系变得更加臃肿。