/* The available functions are: p2_initenv() p2_create_procgroup() p2_wait_for_end() p2_get_my_id() p2_num_total_slaves() p2_clock() p2_send() p2_sendr() p2_recv() p2_msg_free() p2_messages_available() void p2_malloc_failed(); void p2_exit(); */ #include /*typedef int SEMAPHORE, LOCK; -- for PC compiling*/ #include #include "p2.h" #define MAX_PROCS 20 #define SHARE_MEM_SIZE 4*1024*1024 typedef struct msg_struct { struct msg_struct *next; int rendezvous; int sender; int type; int length; } MSG, *MSG_PTR; typedef struct { MSG_PTR first, last; LOCK input_lock; int waiting; SEMAPHORE wait; SEMAPHORE rendezvous; } INPUT, *INPUT_PTR; typedef struct { INPUT proc[MAX_PROCS]; } SHARE_INFO, *SHARE_INFO_PTR; static SHARE_INFO_PTR share_info; static int num_slaves=0; static int my_id=0; static unsigned int *timer_ptr; static MSG first={NULL,0,0,0,0}; /* fake first message on the local queue*/ static MSG_PTR last = &first; /* The first executable statement in a p2 master program should be: p2_initenv(); This initializes the p2 system. void p2_initenv() should be called by your master process before an attempt is made to start any processes or to use any p2 procedures or data areas. We suggest making it the first executable statement in your master program. */ void p2_initenv() {int i; INPUT_PTR p; timer_ptr = timer_init(); if(share_malloc_init(SHARE_MEM_SIZE) != 0) { fprintf(stderr,"share_malloc_init failure.\n"); exit(1); } share_info = (SHARE_INFO_PTR)share_malloc(sizeof(SHARE_INFO)); for (i=0;iproc[i]; p->first = p->last = NULL; p->waiting = 0; spin_init(&p->input_lock,PAR_UNLOCKED); semaphore_init(&p->wait,PROCESS_BLOCK); semaphore_set(&p->wait,0); semaphore_init(&p->rendezvous,PROCESS_BLOCK); semaphore_set(&p->rendezvous,0); } } /* The statement: p2_create_procgroup(n); creates the set of n slaves. Obviously this statement must be executed before any slaves can be assumed to exist. void p2_create_procgroup(n) int n; The p2-managed processes are automatically assigned unique id's (beginning with 0 for the master), they have message queues allocated for them so that they can do message-passing, and they are able to run either on a shared-memory multiprocessor with the creating process or they can run on a separate machine, although in our implementation they will all run on the same machine (the encore). Each created process executed a parameterless procedure named "slave". To terminate, it returns from the slave procedure. message send/receive procedures */ void p2_create_procgroup(n) {extern void slave(); num_slaves = n; my_id = proc_fork(n+1); if (my_id > 0) { slave(); proc_join(); } } /* unsigned int p2_clock() returns an integer value indicating the current time. The resolution of the clock is hardware dependent. You can subtract one clock time from another to determine the length of time that some computation takes. */ unsigned int p2_clock() { return *timer_ptr; } /* Pp supports a set of send/receive procedures. These procedures are "generic" in the sense that they do not know whether a message must travel across a network or through shared memory. They depend on a lower-level set of procedures that handle local or network (remote) communications. By default, the messages are assumed to be typed. If the user wishes to use untyped messages, he can hide the typing by coding some very simple C macros that always use a single message type. */ /* void p2_send(type,to,msg,len) int type,to,len; char *msg; sends the message (a character string) of the specified type to the process with p2-id given in "to". The length of the message is given in "len". */ void p2_send(type,to,msg,len) int type,to,len; char *msg; {MSG_PTR p; INPUT_PTR q; int wake_up; p = (MSG_PTR)share_malloc(sizeof(MSG)+len); if (!p) { fprintf(stderr,"share_malloc failure!\n"); exit(1); } p->next = NULL; p->length = len; p->sender = my_id; p->type = type; p->rendezvous = 0; memcpy((char *)(p+1),msg,len); q = &share_info->proc[to]; spin_lock(&q->input_lock); if (q->first==NULL) { q->first = q->last = p; } else { q->last->next = p; q->last = p; } wake_up = q->waiting; q->waiting = 0; spin_unlock(&q->input_lock); if (wake_up) semaphore_signal(&q->wait); } /* void p2_sendr(type,to,msg,len) int type,to,len; char *msg; stands for p2_send with "rendezvous". It is identical to p2_send with one exception, it waits for an acknowledgement that the message was received. Note that this acknowledgement is provided by p2 and is not required to be sent by the user program. */ void p2_sendr(type,to,msg,len) int type,to,len; char *msg; {MSG_PTR p; INPUT_PTR q; int wake_up; p = (MSG_PTR)share_malloc(sizeof(MSG)+len); if (!p) { fprintf(stderr,"share_malloc failure!\n"); exit(1); } p->next = NULL; p->length = len; p->sender = my_id; p->type = type; p->rendezvous = 1; memcpy((char *)(p+1),msg,len); q = &share_info->proc[to]; spin_lock(&q->input_lock); if (q->first==NULL) { q->first = q->last = p; } else { q->last->next = p; q->last = p; } wake_up = q->waiting; q->waiting = 0; spin_unlock(&q->input_lock); if (wake_up) semaphore_signal(&q->wait); semaphore_wait(&share_info->proc[my_id].rendezvous); } static MSG_PTR find_message(type,from) int type,from; { MSG_PTR p,q; /*q is message being considered, p is previous message*/ for (p = &first, q = first.next; q!=NULL; p = q, q = q->next) { if ((q->type == type || type == -1 ) && (q->sender== from || from == -1) ) { return p; } } return NULL; } /* int p2_messages_available(req_type,req_from) int *req_type,*req_from; returns a BOOL value indicating whether the process has any messages available or not. Req_type and req_from are both pointers to integers; they are used as both input and arguments. On input, req_type has a value that indicates the type of message that the user wishes to check for availability (-1 indicates any type). Req_from is used similarly to indicate who a message is desired from. */ int p2_messages_available(req_type,req_from) int *req_type,*req_from; { MSG_PTR p,q; INPUT_PTR in = &share_info->proc[my_id]; spin_lock(&in->input_lock); if (in->first!=NULL) { last->next = in->first; last = in->last; in->first = NULL; } spin_unlock(&in->input_lock); if ( (p = find_message(*req_type,*req_from) ) != NULL) { q = p->next; *req_type = q->type; *req_from = q->sender; return 1; } else { return 0; } } /* void p2_recv(req_type,req_from,msg,len_rcvd) int *req_type,*req_from,*len_rcvd; char **msg; takes four arguments. The msg argument is a "pointer to a pointer" to char that is assigned the address of a received message. The len_rcvd argument is a pointer to an integer that is assigned the length of the received message. Req_type and Req_from are both pointers to integers; they are used as both input and arguments. On input, req_type has a value that indicates the type of message that the user wishes to receive (-1 indicates any type). It will block until a message of that type is available. Req_from is used similarly to indicate who a message is desired from. One important note about this procedure is that it obtains the area in which to place a message, and the user must explicitly free that area when finished with it (see p2_msg_free below). */ void p2_recv(req_type,req_from,msg,len_rcvd) int *req_type,*req_from,*len_rcvd; char **msg; { MSG_PTR p,q; INPUT_PTR in = &share_info->proc[my_id]; while (1) { spin_lock(&in->input_lock); if (in->first!=NULL) { last->next = in->first; last = in->last; in->first = NULL; } spin_unlock(&in->input_lock); if ( (p = find_message(*req_type,*req_from) ) != NULL) { q = p->next; p->next = q->next; q->next = NULL; /*just on principle*/ if (p->next == NULL) last = p; *req_type = q->type; *req_from = q->sender; *len_rcvd = q->length; *msg = (char *)(q+1); if (q->rendezvous) semaphore_signal(&share_info->proc[q->sender].rendezvous); return ; } else { /*not found*/ spin_lock(&in->input_lock); if (in->first != NULL) { spin_unlock(&in->input_lock); } else { in->waiting = 1; spin_unlock(&in->input_lock); semaphore_wait(&in->wait); } } } } /* void p2_msg_free(m) char *m; frees the message pointed to by "m". This procedure should be used for this task because a message has hidden information which the user is unaware of and therefore cannot free himself. */ void p2_msg_free(m) MSG_PTR m; { share_free(m-1); } /* int p2_get_my_id() returns an integer value representing the id of the process assigned by the p2 system. */ int p2_get_my_id() { return my_id; } /* int p2_num_total_slaves() returns an integer value indicating the total number of slaves started by p2_create_procgroup. */ int p2_num_total_slaves() { return num_slaves; } /* void p2_wait_for_end() is the p2 termination/cleanup procedure that you should invoke at the end of every execution of a program that uses p2. It should be invoked exactly once at the end of the run (in the master process). It does some termination processing and then waits for p2-managed slave processes to end. */ void p2_wait_for_end() { proc_join(); } void p2_exit(L); { exit(L); } void p2_malloc_failed() { fprintf(stderr,"(share-)malloc failed on node d\n",my_id); p2_exit(1); }