#include<stdio.h> #include<unistd.h> #include<sys/types.h> main() { intfd[2]; pid_t childpid; pipe(fd); if((childpid=fork())==-1) { perror("fork"); exit(1); } if(childpid==0) { /*Child PRocess closes up in put side of pipe*/ close(fd[0]); } else { /*Parent process closes up out put side of pipe*/ close(fd[1]); }.. }
正如前面提到的,一但创建了管道之后,管道所使用的文件描述符就和正常文件的文件描述符一样了。
#include<stdio.h> #include<unistd.h> #include<sys/types.h> intmain(void) { intfd[2],nbytes; pid_tchildpid; charstring[]="Hello,world! "; charreadbuffer[80]; pipe(fd); if((childpid=fork())==-1) { perror("fork"); exit(1); } if(childpid==0) { /*Child process closes up in put side of pipe*/ close(fd[0]); /*Send"string"through the out put side of pipe*/
write(fd[1],string,strlen(string)); exit(0); } else { /*Parent process closes up out put side of pipe*/ close(fd[1]); /*Readinastringfromthepipe*/ nbytes=read(fd[0],readbuffer,sizeof(readbuffer)); printf("Receivedstring:%s",readbuffer); } return(0); }
再看下面的程序: .. childpid=fork(); if(childpid==0) { /*Close up standard input of the child*/ close(0); /*Dup licate the input side of pipe to stdin*/ dup(fd[0]); execlp("sort","sort",NULL); . } 因为文件描述符0(stdin)被关闭,所以dup()把管道的输入描述符复制到它的标准输入中。这样我们可以调用execlp(),使用sort程序覆盖子进程的正文段。因为新创建的程序从它的父进程中继续了标准输入/输出流,所以它实际上继续了管道的输入端作为它的标准输入端。现在,最初的父进程送往管道的任何数据都将会直接送往sort函数。
intsend_message(int qid,struct mymsgbuf *qbuf) { intresult,length; /*The length is essentially the size of the structure minus sizeof(mtype)*/ length=sizeof(structmymsgbuf)-sizeof(long); if((result=msgsnd(qid,qbuf,length,0))==-1) { return(-1); } return(result); }
int change_queue_mode(int qid, char *mode ) { struct msqid_ds tmpbuf; /* Retrieve a current copy of the internal data structure */ get_queue_ds( qid, &tmpbuf); /* Change the permissions using an old trick */ sscanf(mode, "%ho", &tmpbuf.msg_perm.mode); /* Update the internal data structure */ if( msgctl( qid, IPC_SET, &tmpbuf) == -1) { return(-1); } return( }
信号量是一个可以用来控制多个进程存取共享资源的计数器。它经常作为一种锁定机制来防止当一个进程正在存取共享资源时,另一个进程也存取同一资源。下面先简要地介绍一下信号量中涉及到的数据结构。 1.内核中的数据结构semid_ds 和消息队列一样,系统内核为内核地址空间中的每一个信号量集都保存了一个内部的数据结构。数据结构的原型是semid_ds。它是在linux/sem.h中做如下定义的: /*One semid data structure for each set of semaphores in the system.*/ structsemid_ds{ structipc_permsem_perm;/*permissions..seeipc.h*/ time_tsem_otime;/*last semop time*/ time_tsem_ctime;/*last change time*/ structsem*sem_base;/*ptr to first semaphore in array*/ structwait_queue*eventn; structwait_queue*eventz; structsem_undo*undo;/*undo requestson this array*/ ushortsem_nsems;/*no. of semaphores in array*/ }; sem_perm是在linux/ipc.h定义的数据结构ipc_perm的一个实例。它保存有信号量集的存取权限的信息,以及信号量集创建者的有关信息。 sem_otime最后一次semop()操作的时间。 sem_ctime最后一次改动此数据结构的时间。 sem_base指向数组中第一个信号量的指针。 sem_undo数组中没有完成的请求的个数。 sem_nsems信号量集(数组)中的信号量的个数。
数据结构sem。它也是在linux/sem.h中定义的: /*One semaphore structure for each semaphore in the system.*/ structsem{ shortsempid;/*pid of las tOperation*/ ushortsemval;/*current value*/ ushortsemncnt;/*num procs awaiting increase in semval*/ ushortsemzcnt;/*num procs awaiting semval=0*/ }; sem_pid最后一个操作的PID(进程ID)。 sem_semval信号量的当前值。 sem_semncnt等待资源的进程数目。 sem_semzcnt等待资源完全空闲的进程数目。
/*semop systemcall takes an array of these*/ structsembuf{ ushortsem_num;/*semaphore index in array*/ shortsem_op;/*semaphore operation*/ shortsem_flg;/*operation flags*/ sem_num将要处理的信号量的个数。 sem_op要执行的操作。 sem_flg操作标志。
参数arg代表一个semun的实例。semun是在linux/sem.h中定义的: /*arg for semctl systemcalls.*/ unionsemun{ intval;/*value for SETVAL*/ structsemid_ds*buf;/*buffer for IPC_STAT&IPC_SET*/ ushort*array;/*array for GETALL&SETALL*/ structseminfo*__buf;/*buffer for IPC_INFO*/ void*__pad;
/* One shmid data structure for each shared memory segment in the system. */ struct shmid_ds { struct ipc_perm shm_perm; /* operation perms */ int shm_segsz; /* size of segment (bytes) */ time_t shm_atime; /* last attach time */ time_t shm_dtime; /* last detach time */ time_t shm_ctime; /* last change time */ unsigned short shm_cpid; /* pid of creator */ unsigned short shm_lpid; /* pid of last operator */
short shm_nattch; /* no. of current attaches */ /* the following are private */ unsigned short shm_npages; /* size of segment (pages) */ unsigned long *shm_pages; /* array of ptrs to frames -> SHMMAX */ struct vm_area_struct *attaches; /* descriptors for attaches */ };
shm_perm 是数据结构ipc_perm的一个实例。这里保存的是内存段的存取权限,和其他的有关内存段创建者的信息。 shm_segsz 内存段的字节大小。 shm_atime 最后一个进程存取内存段的时间。 shm_dtime 最后一个进程离开内存段的时间。 shm_ctime 内存段最后改动的时间。 shm_cpid 内存段创建进程的P I D。 shm_lpid 最后一个使用内存段的进程的P I D。 shm_nattch 当前使用内存段的进程总数。
系统调用: shmat(); 原型:int shmat ( int shmid, char *shmaddr, int shmflg); 返回值:假如成功,则返回共享内存段连接到进程中的地址。假如失败,则返回- 1:errno = EINVAL (无效的IPC ID 值或者无效的地址) ENOMEM (没有足够的内存) EACCES (存取权限不够) 假如参数a d d r的值为0,那么系统内核则试图找出一个没有映射的内存区域。我们推荐使用这种方法。你可以指定一个地址,但这通常是为了加快对硬件设备的存取,或者解决和其他程序的冲突。 下面的程序中的调用参数是一个内存段的I P C标识符,返回内存段连接的地址:
这样是否达到了我们的要求了呢?不尽如此,因为依靠时间的延迟执行同步是不可靠的。这里碰到的情形和一个分布程序和共享资源的情形一样。共享的资源是标准的输出设备,分布计算的程序是三个线程。 其实这里还有另外一个错误。函数sleep和函数e x i t一样和进程有关。当线程调用sleep时,整个的进程都处于睡眠状态,也就是说,所有的三个线程都进入睡眠状态。这样我们实际上没有解决任何的问题。希望使一个线程睡眠的函数是pthread_delay_np。例如让一个线程睡眠2秒钟,用如下程序:
(void *)&reader_function, NULL); w r i t e r _ f u n c t i o n ( ) ; } void writer_function(void) { w h i l e ( 1 ) { semaphore_down( &writers_turn ); b u ffer = make_new_item(); semaphore_up( &readers_turn ); } } void reader_function(void) { w h i l e ( 1 ) { semaphore_down( &readers_turn ); consume_item( buffer ); semaphore_up( &writers_turn ); } } 这个例子也没有完全地利用一般信号量的所有函数。我们可以使用信号量重新编写“Hello world” 的程序: void print_message_function( void *ptr ); Semaphore child_counter; Semaphore worlds_turn; main( ) { pthread_t thread1, thread2; char *message1 = "Hello"; char *message2 = "Wo r l d " ; semaphore_init( &child_counter ); semaphore_init( &worlds_turn ); semaphore_down( &worlds_turn ); /* world goes second */ semaphore_decrement( &child_counter ); /* value now 0 */ semaphore_decrement( &child_counter ); /* value now -1 */ /* * child_counter now must be up-ed 2 times for a thread blocked on it * to be released * * / pthread_create( &thread1, pthread_attr_default, (void *) &print_message_function, (void *) message1); semaphore_down( &worlds_turn ); pthread_create(&thread2, pthread_attr_default, (void *) &print_message_function, (void *) message2); semaphore_down( &child_counter ); /* not really necessary to destroy since we are exiting anyway */ semaphore_destroy ( &child_counter ); semaphore_destroy ( &worlds_turn ); e x i t ( 0 ) ; } void print_message_function( void *ptr ) { char *message; message = (char *) ptr; printf("%s ", message); fflush(stdout); semaphore_up( &worlds_turn ); semaphore_up( &child_counter ); p t h r e a d _ e x i t ( 0 ) ; } 信号量c h i l d _ c o u n t e r用来强迫父线程阻塞,直到两个子线程执行完p r i n t f语句和其后的semaphore_up( &child_counter )语句才继续执行。 Semaphore.h
#ifndef SEMAPHORES #define SEMAPHORES #include #include typedef struct Semaphore { int v; pthread_mutex_t mutex; pthread_cond_t cond; } S e m a p h o r e ; int semaphore_down (Semaphore * s); int semaphore_decrement (Semaphore * s); int semaphore_up (Semaphore * s); void semaphore_destroy (Semaphore * s); void semaphore_init (Semaphore * s); int semaphore_value (Semaphore * s); int tw_pthread_cond_signal (pthread_cond_t * c); int tw_pthread_cond_wait (pthread_cond_t * c, pthread_mutex_t * m); int tw_pthread_mutex_unlock (pthread_mutex_t * m); int tw_pthread_mutex_lock (pthread_mutex_t * m); void do_error (char *msg); # e n d i f
Semaphore.c
#include "semaphore.h" / *
* function must be called prior to semaphore use. * * / v o i d semaphore_init (Semaphore * s) { s->v = 1; if (pthread_mutex_init (&(s->mutex), pthread_mutexattr_default) == -1) do_error ("Error setting up semaphore mutex"); if (pthread_cond_init (&(s->cond), pthread_condattr_default) == -1) do_error ("Error setting up semaphore condition signal"); * function should be called when there is no longer a need for * the semaphore. * * / v o i d semaphore_destroy (Semaphore * s) { if (pthread_mutex_destroy (&(s->mutex)) == -1) do_error ("Error destroying semaphore mutex"); if (pthread_cond_destroy (&(s->cond)) == -1) do_error ("Error destroying semaphore condition signal"); } / * * function increments the semaphore and signals any threads that * are blocked waiting a change in the semaphore. * * / i n t semaphore_up (Semaphore * s) { int value_after_op; tw_pthread_mutex_lock (&(s->mutex)); ( s - > v ) + + ; value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex)); tw_pthread_cond_signal (&(s->cond)); return (value_after_op); } / * * function decrements the semaphore and blocks if the semaphore is * <= 0 until another thread signals a change. * * / i n t semaphore_down (Semaphore * s) { int value_after_op; tw_pthread_mutex_lock (&(s->mutex)); while (s->v <= 0) { tw_pthread_cond_wait (&(s->cond), &(s->mutex)); } ( s - > v ) - - ; value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex)); return (value_after_op); } / * * function does NOT block but simply decrements the semaphore. * should not be used instead of down -- only for programs where * multiple threads must up on a semaphore before another thread * can go down, i.e., allows programmer to set the semaphore to * a negative value prior to using it for synchronization. * * / i n t semaphore_decrement (Semaphore * s) { int value_after_op; tw_pthread_mutex_lock (&(s->mutex)); s - > v - - ; value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex)); return (value_after_op); } / * * function returns the value of the semaphore at the time the * critical section is accessed. obviously the value is not guarenteed * after the function unlocks the critical section. provided only * for casual debugging, a better approach is for the programmar to * protect one semaphore with another and then check its value. * an alternative is to simply record the value returned by semaphore_up
* or semaphore_down. * * / i n t semaphore_value (Semaphore * s) { /* not for sync */ int value_after_op; tw_pthread_mutex_lock (&(s->mutex)); value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex)); return (value_after_op); } /* -------------------------------------------------------------------- */ /* The following functions replace standard library functions in that */ /* they exit on any error returned from the system calls. Saves us */ /* from having to check each and every call above. */ /* -------------------------------------------------------------------- */ i n t tw_pthread_mutex_unlock (pthread_mutex_t * m) { int return_value; if ((return_value = pthread_mutex_unlock (m)) == -1) do_error ("pthread_mutex_unlock"); return (return_value); } i n t tw_pthread_mutex_lock (pthread_mutex_t * m) { int return_value; if ((return_value = pthread_mutex_lock (m)) == -1) do_error ("pthread_mutex_lock"); return (return_value); } i n t tw_pthread_cond_wait (pthread_cond_t * c, pthread_mutex_t * m) { int return_value; if ((return_value = pthread_cond_wait (c, m)) == -1) do_error ("pthread_cond_wait"); return (return_value); } i n t tw_pthread_cond_signal (pthread_cond_t * c) { int return_value; if ((return_value = pthread_cond_signal (c)) == -1) do_error ("pthread_cond_signal"); return (return_value); } / * * function just prints an error message and exits * * / v o i d do_error (char *msg) { perror (msg); exit (1); }