进程互斥实验
为了能够体验IPC机制的消息队列的用法,示例程序采用了 Theaker &Brookes
提出的消息传递算法。该算法中有一控制进程,带有3个不同类型的消息信箱,它们分别是:读请求信箱、写请求信箱和操作完成信箱。读者需要
访问临界资源时首先要向控制进程发送读请求消息,写者需要访问临界资源时也要先向控制进程发送写请求消息,在得到控制进程的允许消息后方可进入临界区读或写。读或写者在完成对临界资源的访问后还要向控制进程发送操作完成消息。
控制进程使用一个变量count
控制读写者互斥的访问临界资源并允许写者优先。 count
的初值需要一个比最大读者数还要大的数,本例取值为100。当count
大于0 时说明没有新的读写请求,控制进程接收读写者新的请求,如果收到读者完成消息,对count
的值加1,如果收到写者请求消息,count
的值减100,如果收到读者请求消息,对count
的值减1。 当count
等于0时说明写者正在写,控制进程等待写者完成后再次令count
的值等于100。 当count
小于0 时说明读者正在读,控制进程等待读者完成后对count
的值加1。
Linux系统调用语法
创建消息队列
1 2
| #include<sys/msg.h> int msgget(key_t key,int flags)
|
key
消息队列的键值,可以为IPC_PRIVATE
,也可以用整数指定一个
flags
消息队列权限位。
msgget
调用成功,如果key
用新整数指定,且flags
中设置了IPC_CREAT
位,则返回一个新建立的消息队列标识符。 如果指定的整数key
已存在则返回与key
关联的标识符。成功返回-1。
追加一条新消息到消息队列
1 2
| #include <sys.msg.h> int msgsnd(int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg);
|
msqid
由消息队列的标识符
msgp
消息缓冲区指针。消息缓冲区结构为:
1 2 3 4
| struct msgbuf { long mtype; char mtext[1]; }
|
msgsz
消息数据的长度
msgflg
为0表示阻塞方式,设置IPC_NOWAIT
表示非阻塞方式
msgsnd
调用成功返回0,不成功返回-1。
从到消息队列中读出一条新消息
1 2
| #include <sys.msg.h> int msgrcv(int msqid, struct msgbuf *msgp, size_t msgsz, long msgtype, int msgflg);
|
msqid
消息队列的标识符
msgp
消息缓冲区指针。消息缓冲区结构为:
1 2 3 4
| struct msgbuf { long mtype; char mtext[1]; }
|
msgsz
消息数据的长度
msgtype
决定从队列中返回哪条消息:
msgflg
为0表示阻塞方式,设置IPC_NOWAIT
表示非阻塞方式
msgrcv
调用成功返回0,不成功返回-1。
删除消息队列
1 2
| #include <sys/msg.h> int msgctl(int msqid, int cmd, struct msqid_ds *buf);
|
msqid
由消息队列的标识符
cmd
控制命令。常用的有:
IPC_RMID
删除msgid
标识的消息队列IPC_STAT
为非破坏性读,从队列中读出一个msgid_ds
结构填充缓冲buf
IPC_SET
改变队列的UID,GID
,访问模式和最大字节数。
msgctl
调用成功返回0,不成功返回-1。
思路
首先,我们对程序要有最基本的构思:一个控制台运行理发师程序,这个程序要创建三个进程(包括最初的父进程)作为三个理发师。另一个控制台运行顾客程序,负责不断产生顾客,同时顾客要满足题目中给定的沙发与等候室的数量要求。
其次,由于使用消息队列,我们可以这样定义理发师与消费者的行为。
对于理发师,采用非阻塞的消息队列,如果沙发申请的消息队列中能收到消息,说明有顾客来理发了,理发师向沙发回应的队列中发送消息,释放一个沙发位置,相当于为该坐在沙发上的顾客理发,并互斥地完成结账。
对于顾客,逻辑就要更复杂一些。首先,我们要进行判断,如果沙发上就坐的人数少于4个,那么由于我们要保证等候室的顾客先占用沙发的位置,就需要先看等候室人数是否为0,不为0则将一个等候室中的顾客调入沙发(向等候室回应的消息队列发送消息),为0就说明店内总人数还没有将沙发坐满,优先进入沙发的位置,但无论等候室人数多少,都会向沙发申请的队列发送消息。当沙发上坐满四个人,我们就让顾客去等候室等待(向等候室申请的消息队列发送消息),再否则,阻塞掉当前生产顾客的进程,认为顾客不再进入理发店。
其次,在上述判断之外,我们还需要添加沙发回应和等候室回应两个消息队列中消息的处理方法,其实就是没接受到一个回应,就将相应位置的人数减一。
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/sem.h> #include <sys/msg.h>
#define BUFSZ 256 #define MAXVAL 100 #define STRSIZ 8 #define WRITERQUEST 1 #define READERQUEST 2 #define FINISHED 3
typedef union semuns { int val; } Sem_uns;
typedef struct msgbuf { long mtype; int mid; } Msg_buf;
key_t account_key; int account_sem;
int sem_val; int sem_flg;
int wait_quest_flg; key_t wait_quest_key; int wait_quest_id; int wait_respond_flg; key_t wait_respond_key; int wait_respond_id;
int sofa_quest_flg; key_t sofa_quest_key; int sofa_quest_id; int sofa_respond_flg; key_t sofa_respond_key; int sofa_respond_id;
int get_ipc_id(char *proc_file,key_t key); char *set_shm(key_t shm_key,int shm_num,int shm_flag); int set_msq(key_t msq_key,int msq_flag); int set_sem(key_t sem_key,int sem_val,int sem_flag); int down(int sem_id); int up(int sem_id);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| #include "ipc.h"
int get_ipc_id(char *proc_file,key_t key) { FILE *pf; int i,j; char line[BUFSZ],colum[BUFSZ]; if((pf = fopen(proc_file,"r")) == NULL){ perror("Proc file not open"); exit(EXIT_FAILURE); } fgets(line, BUFSZ,pf); while(!feof(pf)){ i = j = 0; fgets(line, BUFSZ,pf); while(line[i] == ' ') i++; while(line[i] !=' ') colum[j++] = line[i++]; colum[j] = '\0'; if(atoi(colum) != key) continue; j=0; while(line[i] == ' ') i++; while(line[i] !=' ') colum[j++] = line[i++]; colum[j] = '\0'; i = atoi(colum); fclose(pf); return i; } fclose(pf); return -1; } int down(int sem_id) { struct sembuf buf; buf.sem_op = -1; buf.sem_num = 0; buf.sem_flg = SEM_UNDO; if((semop(sem_id,&buf,1)) <0) { perror("down error "); exit(EXIT_FAILURE); } return EXIT_SUCCESS; }
int up(int sem_id) { struct sembuf buf; buf.sem_op = 1; buf.sem_num = 0; buf.sem_flg = SEM_UNDO; if((semop(sem_id,&buf,1)) <0) { perror("up error "); exit(EXIT_FAILURE); } return EXIT_SUCCESS; }
int set_sem(key_t sem_key,int sem_val,int sem_flg) { int sem_id; Sem_uns sem_arg;
if((sem_id = get_ipc_id("/proc/sysvipc/sem",sem_key)) < 0 ){ if((sem_id = semget(sem_key,1,sem_flg)) < 0){ perror("semaphore create error"); exit(EXIT_FAILURE); } sem_arg.val = sem_val; if(semctl(sem_id,0,SETVAL,sem_arg) <0){ perror("semaphore set error"); exit(EXIT_FAILURE); } } return sem_id; }
char * set_shm(key_t shm_key,int shm_num,int shm_flg) { int i,shm_id; char * shm_buf;
if((shm_id = get_ipc_id("/proc/sysvipc/shm",shm_key)) < 0 ){ if((shm_id = shmget(shm_key,shm_num,shm_flg)) <0){ perror("shareMemory set error"); exit(EXIT_FAILURE); } if((shm_buf = (char *)shmat(shm_id,0,0)) < (char *)0){ perror("get shareMemory error"); exit(EXIT_FAILURE); } for(i=0; i<shm_num; i++) shm_buf[i] = 0; } if((shm_buf = (char *)shmat(shm_id,0,0)) < (char *)0){ perror("get shareMemory error"); exit(EXIT_FAILURE); } return shm_buf; }
int set_msq(key_t msq_key,int msq_flg){ int msq_id; if((msq_id = get_ipc_id("/proc/sysvipc/msg",msq_key)) < 0 ){ if((msq_id = msgget(msq_key,msq_flg)) < 0){ perror("messageQueue set error"); exit(EXIT_FAILURE); } } return msq_id; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| #include "ipc.h" #include <unistd.h> int main(int argc,char *argv[]) { int rate; Msg_buf msg_arg; msg_arg.mtype = 1; if(argv[1] != NULL) rate = atoi(argv[1]); else rate = 3;
wait_quest_flg = IPC_CREAT| 0644; wait_quest_key = 401; wait_quest_id = set_msq(wait_quest_key,wait_quest_flg); wait_respond_flg = IPC_CREAT| 0644; wait_respond_key = 402; wait_respond_id = set_msq(wait_respond_key,wait_respond_flg);
sofa_quest_flg = IPC_CREAT| 0644; sofa_quest_key = 501; sofa_quest_id = set_msq(sofa_quest_key,sofa_quest_flg); sofa_respond_flg = IPC_CREAT| 0644; sofa_respond_key = 502; sofa_respond_id = set_msq(sofa_respond_key,sofa_respond_flg);
int sofa_count=0; int wait_count=0; int i = getpid() * 100; while(1) { sleep(rate); i++; msg_arg.mid = i; printf("sofa 人数 %d,waiting room 人数 %d\n",sofa_count,wait_count); if(sofa_count < 4) { if(wait_count != 0) { msgrcv(wait_quest_id, &msg_arg, sizeof(msg_arg), 0, 0); msgsnd(wait_respond_id, &msg_arg,sizeof(msg_arg), 0); printf("customer%d in pid %d move from waiting room to sofa\n", msg_arg.mid,getpid()); } else { printf("customer%d in pid %d sit sofa\n", i,getpid()); }
sofa_quest_flg=IPC_NOWAIT; msgsnd(sofa_quest_id, &msg_arg, sizeof(msg_arg), sofa_quest_flg); sofa_count++;
} else if(wait_count < 13) { printf("sofa is full, customer%d in pid %d is waiting in the waiting room\n", i,getpid()); wait_quest_flg = IPC_NOWAIT; msgsnd(wait_quest_id, &msg_arg, sizeof(msg_arg), wait_quest_flg); wait_count++;
} else { printf("waiting room is full,customer%d in pid %d can't get into barber shop\n", i,getpid()); msgrcv(sofa_respond_id, &msg_arg, sizeof(msg_arg), 0, 0); sofa_count--; } sofa_quest_flg = IPC_NOWAIT; if(msgrcv(sofa_respond_id, &msg_arg, sizeof(msg_arg), 0, sofa_quest_flg)>=0){ sofa_count--; } wait_quest_flg = IPC_NOWAIT; if(msgrcv(wait_respond_id, &msg_arg, sizeof(msg_arg), 0, wait_quest_flg)>=0) { wait_count--; } }
return 0; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| #include "ipc.h" #include <unistd.h> int main(int argc,char *argv[]) { int rate; Msg_buf msg_arg; msg_arg.mtype = 2; if(argv[1] != NULL) rate = atoi(argv[1]); else rate = 3;
sofa_quest_flg = IPC_CREAT| 0644; sofa_quest_key = 501; sofa_quest_id = set_msq(sofa_quest_key,sofa_quest_flg); sofa_respond_flg = IPC_CREAT| 0644; sofa_respond_key = 502; sofa_respond_id = set_msq(sofa_respond_key,sofa_respond_flg);
account_key = 602; sem_flg = IPC_CREAT | 0644; sem_val = 1; account_sem = set_sem(account_key,sem_val,sem_flg);
int pid1, pid2; pid1=fork(); if(pid1==0) { while(1) { wait_quest_flg=IPC_NOWAIT; sleep(rate); if(msgrcv(sofa_quest_id, &msg_arg, sizeof(msg_arg), 0, wait_quest_flg)>=0) { msgsnd(sofa_respond_id, &msg_arg,sizeof(msg_arg), 0); printf("%d barber is serving for %d customer \n", getpid(), msg_arg.mid); down(account_sem); printf("%d barber is collect %d customer's money\n", getpid(), msg_arg.mid); up(account_sem); }else { printf("%d barber is sleeping\n", getpid()); } } } else { pid2=fork(); if(pid2==0) { while(1) { wait_quest_flg=IPC_NOWAIT; sleep(rate); if(msgrcv(sofa_quest_id, &msg_arg, sizeof(msg_arg), 0, wait_quest_flg)>=0) { msgsnd(sofa_respond_id, &msg_arg,sizeof(msg_arg), 0); printf("%d barber is serving for %d customer \n", getpid(), msg_arg.mid); down(account_sem); printf("%d barber is collect %d customer's money\n", getpid(), msg_arg.mid); up(account_sem); } else { printf("%d barber is sleeping\n", getpid());
} } } else { while(1) { wait_quest_flg=IPC_NOWAIT; sleep(rate); if(msgrcv(sofa_quest_id, &msg_arg, sizeof(msg_arg), 0, wait_quest_flg)>=0) { msgsnd(sofa_respond_id, &msg_arg,sizeof(msg_arg), 0); printf("%d barber is serving for %d customer \n", getpid(), msg_arg.mid); down(account_sem); printf("%d barber is collect %d customer's money\n", getpid(), msg_arg.mid); up(account_sem); } else { printf("%d barber is sleeping\n", getpid());
} } } } return 0; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| hdrs = ipc.h r_src = barber.c ipc.c r_obj = barber.o ipc.o w_src = costumer.c ipc.c w_obj = costumer.o ipc.o opts = -g -c
all: costumer barber
barber: $(r_obj) gcc $(r_obj) -o barber
barber.o: $(r_src) $(hdrs) gcc $(opts) $(r_src)
costumer: $(w_obj) gcc $(w_obj) -o costumer
costumer.o: $(w_src) $(hdrs) gcc $(opts) $(w_src)
clean: rm barber costumer *.o
|
遇到的一些问题与解决方案
如果发现一个消息一直无法被添加到消息队列,也就是说,msgsnd
一直返回-1
,可以检查msg_arg.mtype
或者msg_arg.mid
是否进行了赋值。
一些美中不足
顾客的进程是进程间不安全的,也就是说,没有进行进程间同步。示例实验采用的方案是将读写与控制器分离但在独立实验中这样做显得有些困难。另一种解决方案是将沙发人数与等候室人数放入共享内存,但这样的话,每个顾客生产进程就要涉及到对共享内存内容的初始化,也就是说,我们无法保证共享内存只被初始化一次。
我想到的一种方案是在共享内存中再加一个判断是否已经初始化的布尔值,来控制初始化次数,但这样的话,我们就必须引入对共享内存的三个不同值的读写的互斥锁,会导致整个代码体系变得更加臃肿。