AMDC: Programming Techniques with Processes

September 23, 1996.

Updated December 10, 1996 by K. Cablk.

Released to the public domain, 1996, by Thomas W. Christopher

AMDC allows, indeed requires, a number of exotic programming techniques. We will exhibit a number of them here.

Warning: In the current document, we will include code that has not yet been tested. It's important to get the ideas out quickly.

Direct communication

You can write programs composed of a number of processes that communicate directly with each other. To run your program on five nodes, use

yourprog -n5

Each node will run a copy of your program.

Your main function is named Main and will need to know its location and its location's name:

global Location my_loc;
global LocName my_name;
int Main(int argc, char* argv[])
{.../*declarations*/
  my_loc=amdc_getMyLoc();
  my_name=amdc_getNameOfLoc(my_loc);
  ... /*actions*/
};

To send a message to another process, you can use a procedure like this:

void p_send(int dst, Tag t, char* body, int leng)
{ Message p;
  LocName L;
  L = amdc_0DLocName(&L,AMDC_PROCESS_SYMBOL);
  L.X[0]=dst;
  p = amdc_rawMsg(leng);
  memcpy((char*)p,body,leng);
  amdc_sendToAs(p,L,t);
}

To receive a message with a particular tag, waiting for it to arrive and returning a pointer to it, use a procedure like this:

Message p_recv(Tag t)
{ Message p;
  while ((p=amdc_getLoc(my_loc,t))==NULL) {
          amdc_pollBlock();
  }
  return p;
}

Shared records

The processes can share records that are stored in messages at locations. Here we present a simple version of shared records based on the Memo system.

Every record will have its own location name. Arrays of records have location names containing the same symbol (serving as the array name) and the subscripts in their X fields.

The operations available for records are to store records at locations, fetch records from locations, and fetch copies of records from locations.

To construct location names, you can use these declarations:

LocName loc3D(Symbol S,
        unsigned long X0,
        unsigned long X1,
        unsigned long X2)
{        LocName L;
        L.S=S;
        L.X[0]=X0;
        L.X[1]=X1;
        L.X[2]=X2;
        return L;
}
LocName loc2D(Symbol S,
        unsigned long X0,
        unsigned long X1)
{ return loc3D(S,X0,X1,0L); }
LocName loc1D(Symbol S,
        unsigned long X0)
{ return loc3D(S,X0,0L,0L); }
LocName loc0D(Symbol S)
{ return loc3D(S,0L,0L,0L); }

Each record will sit in its own location in a message tagged DATA. It may be fetched from its location, removing it; it may be stored back in its location; it may have a copy of it fetched.

When an record is fetched, it is, in effect, locked. It will be absent from its location for a while, until it is stored back. This is exactly what you want to protect data structures from multiple writers.

In outline, the operations are simple:

  • To store an record in a location, send it as a message to the location with the tag DATA and a script that will put it in the table there.
  • To fetch an record, send a message to the location that will remove the DATA message and send it back to the requester.
  • To fetch a copy of an record, send a message that will remove the message, copy it, deposit it back at the location, and return the copy to the sender.

Unfortunately, there are some interactions among these operations that have to be considered. They all come down to this:

What if a fetch arrives when the record is not present?

Clearly the fetch message should wait for the record to show up, so the fetch needs to be deposited in the table.

That means the script to store the record will have to look in the table for fetch messages to handle. The easiest way to handle them is to enqueue them in the input of the location to try their scripts again. That, however, may place the fetches behind later-arriving fetches. The store script can overcome this by dispatching the messages immediately. We show code for both.

Fetching records

Procedure called by a process to fetch an record:

The procedure called by a process to fetch an record is a kind of remote procedure call. It sends out a message with a procedure to execute, doFetch, and includes in the message a return address, the destiny consisting of location my_name, tag FETCHED, and script amdc_rawScript. It is a synchronous subroutine: it loops waiting for the fetched message to come back before it returns.

Message fetch(LocName L)
{ Destiny *d=(Destiny*)amdc_rawMsg(sizeof(Destiny));
  Message p;
  amdc_initDest(d,my_name,FETCHED,amdc_rawScript);
  amdc_sendToAsDo((Message)d,L,FETCH,doFetch);
  while ((p=amdc_getLoc(my_loc,FETCHED))==NULL)
          amdc_pollBlock();
  return p;
}

This procedure may be called to return a record with structure R from the distributed array element S[i] by something resembling:

p = (struct R*)fetch(loc1D(S,i));

Script to fetch an record

Here is a script to fetch an record. Notice that the record itself is sent back to the requester. The request message is freed by this script.

void doFetch(Message m, Location loc)
{ Message p;
  if ((p=amdc_getLoc(loc,DATA))!=NULL) {
       amdc_sendToDest(p,(Destiny*)m);
       amdc_freeMsg(m);
  } else amdc_putLoc(loc,m);
}

Scripts to store an record

Re-enqueueing the fetches

Here is a script to store an record back in the location. All the fetch-copies are placed in the queue first, allowing them all to get a copy of the record before the fetch removes it. However, as mentioned above, this code has the defect of placing the waiting fetches at the end of the input queue. This would allow a later-arriving fetch to come in first, which not only isn't fair, but it would then cause all the fetches to have to place themselves back in the table before the record arrives back.

void doStore(Message m,Location loc)
{ Message p;
  amdc_putLoc(loc,m);
  while ((p=amdc_getLoc(loc,FETCHCOPY))!=NULL)
         amdc_enqueueAtLoc(loc,p);
  amdc_enqueueAtLoc(loc,amdc_getLoc(loc,FETCH));
    /*amdc_enqueueAtLoc is a no-op 
       if the message==NULL*/
}

Dispatching the fetches

Here is another attempt at the store script. It avoids the problem with the first version by executing the scripts of the waiting fetch messages immediately.

void doStore(Message m, Location loc)
{ Message p;
  amdc_putLoc(loc,m);
  while ((p=amdc_getLoc(loc,FETCHCOPY))!=NULL)
         (amdc_getScript(p))(p,loc);
  if ((p=amdc_getLoc(loc,FETCH))!=NULL)
        (amdc_getScript(p))(p,loc);
}

Procedure to store an record

The procedure that a process calls to store an record is asynchronous. It does not wait for the record to be stored before returning to the process that called it.

void storeAt(Message m,LocName L)
{
  amdc_sendToAsDo(m,L,DATA,doStore);
}

Fetching copies of records

Script to fetch a copy of an record

void doFetchCopy(Message m, Location loc)
{ Message p;
  if ((p=amdc_getLoc(loc,DATA))!=NULL) {
    amdc_sendToDest(amdc_copyMsg(p),(Destiny*)m);
    amdc_putLoc(loc,p);
    amdc_freeMsg(m);
  } else amdc_putLoc(loc,m);
}

Procedure called by a process to fetch an record:

Message fetchCopy(LocName L)
{  Destiny *d=(Destiny*)amdc_rawMsg(sizeof(Destiny));
  Message p;
  amdc_initDest(d,my_name,FETCHED,amdc_rawScript);
  amdc_sendToAsDo((Message)d,L,FETCHCOPY,doFetch);
  while ((p=amdc_getLoc(my_loc,FETCHED))==NULL)
          amdc_pollBlock();
  return p;
}

Unordered queues

The messages with the same tag in a table at a location are kept in a queue. If you send more than one message to a location to be stored, they will be queued. Thus you can use storeAt and fetch to implement a named queue.

Although the queue at the location is FIFO, you are not guaranteed to get a FIFO queue between two processes this way. The problem is that the storeAt procedure does not wait for the message to be actually placed in the table at the location before it returns. One process could execute

storeAt(p,L);
storeAt(q,L);

but q could end up in L's input queue before p. AMDC's run time system does not promise that messages will be delivered in the order sent. (Actually, almost all implementations will deliver messages in the order sent, but we do not guarantee it.)

Anyway, if delivery in the order sent is not essential, you can use storeAt and fetch to implement a named queue.

Locks and semaphores

Locks.

Locks are used by parallel programs to protect critical sections of code. A critical section is a section of code that manipulates a shared data structure. The fetch and store operations for shared records implicitly lock the records they fetch and store.

However, if the processes must access several records at separate locations, they may need to lock the group. An explicit lock can simply be represented by an empty message at a location. Assume we have the declarations:

LocName lock;
Message q;
...

We can initialize a lock by storing an empty message. This, of course, is done by only one process.

void initLock(LocName lock)
{
  storeAt(amdc_rawMsg(0),lock);
}

Locking and unlocking can be done with the following procedures:

void lockup(LocName lock)
{
  amdc_freeMsg(fetch(lock));
}
void unlock(LocName lock)
{
  storeAt(amdc_rawMsg(0),lock);
}

All processes that wish to enter the critical section use the following pattern:

lockup(L);
... /* perform critical section */
unlock(L);

Semaphores.

A semaphore may be viewed as a generalization of semaphores. A semaphore has a non-negative count and two operations, down and up.

The up operation adds one to the count.

The down operation subtracts one from the count when the count is greater than zero. It can't subtract one if the count is zero, since the count must not be negative. If the count is zero when the process first attempts the down, the process waits until some other process does an up.

The simplest implementation of a counting semaphore is similar to a lock, except that to initialize the semaphore, a process places as many empty messages in the semaphore's location as are required by the initial count.

void initSemaphore(LocName sema, int c)
{
  for(;c>0;c--) storeAt(amdc_rawMsg(0),sema);
}
void down(LocName sema)
{
  amdc_freeMsg(fetch(sema));
}
void up(LocName sema)
{
  storeAt(amdc_rawMsg(0),sema);
}

Ordered queues

An queue can simply be a location at which messages are stored and fetched, but the implementations of store and fetch do not guarantee that the messages will be fetched in the order stored, so they implement an unordered queue. If it is important that messages be fetched in the order they were written, you need to implement an ordered queue.

One technique is to create a one-dimensional array of location names. The producer stores into the array elements sequentially. The consumer fetches from them sequentially. Think of each location as coming into existence as it is stored into and going out of existence as it is fetched from. Such an array is often called a stream.

For example, the producer can store messages into locations thus:

#define QUEUE ...
...
LocName tail;
Message M;
int outpos = 0;
tail.S = QUEUE;
tail.X[1] = tail.X[2] = 0;
...
while (1) {
    ... produce item M ...
    tail.X[0] = outpos++;
    storeAt(M,tail));
}

And the consumer can remove them similarly:

LocName head;
Message p;
...
mine.X[0] = inpos++;
p = fetch(head);

Barriers

In parallelizing a loop, it is often important to synchronize the processes executing the loop at one or more points within it using a barrier. If a barrier is initialized with a count N, then N processes must wait at the barrier before any of them are allowed to proceed. The barrier is automatically reinitialized so that they can synchronize over and over again at the same barrier. In AMDC, a barrier can be implemented with a stream (one-dimensional array) and a single message containing an integer count. Each time a process waits at a barrier, it synchronizes by looking at the next stream element. The presence of the message at a location allows the processes to proceed past the barrier.

When a process comes to a barrier, it gets the message and decrements the count in it. If the count is greater than zero, this is not the last process to arrive, so it puts the message back and tries to fetch a copy of the message in the next location, which is not present yet.

If the process decrements the count to zero, then it is the last process to arrive. It stores a message with the full count of the number of processes to synchronize into the next location and continues executing.

Code for this follows:

#define BARRIER ...
LocName barrier;
int num_at_barrier=NUM_TO_SYNCHRONIZE;
int *p;
...
barrier = amdc_initLocName0D(BARRIER);
...
/* initialize the barrier*/
if (amdc_getMyId()==0) {
        p=(int*)amdc_rawMsg(sizeof(int));
        *p = num_at_barrier;
        storeAt(p,barrier);
}
...
/*loop including synchronization at the barrier*/
while (TRUE) {
    .../*do some work*/
    ...
    /* synchronize at the barrier: */
    p = (int *) fetch(barrier);
    (*p)--;
    if (*p==0) {
        /* I'm the last to arrive */
        barrier.X[0]++;
        *p = num_at_barrier;
        storeAt(p,barrier);
    } else {
        /* I am not the last to arrive */
        store(p,barrier);
        barrier.X[0]++;
        p = (int*) fetchCopy(barrier);
        amdc_freeMsg(p);
    }
    ... /*do some more work*/
}

Job jars

An important use of an unordered queue is as a job jar. The messages in the job jar indicate tasks to perform. Whenever a process needs more work to do, it fetches a message out of the job jar. Whenever a process creates more work to do, it drops messages in the job jar.

A job description will typically be a struct whose first member indicates the job to perform and whose other members provide additional parameters. Different jobs will have different structs, but by looking at the first member, which will always be the same type and the same place in the message, a process can determine what the job requires it to do.

There are two reasonable forms for the job indication:

  • an integer--The process will have to switch to the appropriate code for the job. To add new jobs, you have to modify the switch statement, so it's not very modular. But it's pretty safe: you can catch some wrong messages in the job jar by an out-of-bounds integer.
  • a procedure pointer--The process will simply call the procedure passing it the job message. To add new job types, just add new procedures. The danger is that a wrong kind of message in the job jar, or an uninitialized procedure pointer, will crash the program. It is safer, therefore, to also pack in a distinctive, long integer to check before calling the procedure: if the integer is okay, the procedure pointer is probably safe to use. (Notice that the procedure pointer is very like the script in an active message.)

Here are some declarations that might prove useful:

#define JobJar AMDC_AbsoluteSymbol(1,SYMB_HASH)
LocName jobjar={JobJar,0,0,0};
typedef struct {void (*action)(Message); 
    long check;} JobHeader;
#define JobHeaderCode 61413
typedef struct {JobHeader job; ......} Job;

Synchronous and asynchronous remote procedure calls

The procedures fetch, fetchCopy, and storeAt shown above are examples of remote procedure calls. A call is executed on one computer, but the substantive part of the procedure is executed on a different computer. The remote procedure calls can be synchronous or asynchronous.

  • synchronous calls return a value which the caller waits for. The remote part of the procedure completes before the call returns. Procedures fetch and fetchCopy are synchronous.
  • asynchronous calls do not wait for the remote part to terminate before returning. Asynchronous calls can create parallelism, but they are more difficult to use since the caller cannot assume the operation is finished. Procedure storeAt is asynchronous.

Actually, there are two varieties of asynchronous calls:

  • calls that return no result, like storeAt.
  • calls that return a result that arrives back at some later time. The caller can check for the arrival of the result or wait for it.

Remote procedures are not usually written as scripts, but rather as conventional procedures taking a variety of arguments. The paradigmatic synchronous remote procedure call is:

  1. caller calls a stub procedure locally
  2. the stub packs up a message, sends it to a location on the other machine, and waits for the response
  3. the script there unpacks the message to get the arguments and calls the actual procedure
  4. the procedure executes to completion
  5. the script packs up the returned value and by-result parameters into a message and sends it back to the caller
  6. the stub receives the response message, unpacks the results, and returns them to the caller.
AMDC & Web Content Created by
Thomas W. Christopher, George K. Thiruvathukal
&
Virgil Bistriceanu

Web Site Designed and Created by
Lance Larsen & Emad Shawakfa