/*************************************/ /* Demonstrate amdc unordered queues */ /* Kristi Cablk */ /*************************************/ /* This simple program will just have the master node send a number of messages to a named queue, and the slave node will fetch the messages out of the queue and print the data. This will demonstrate how storeAt() and fetch() can be used to implement unordered queues. This program should be run with only 2 nodes. In most implementations of amdc, the messages will be delivered in the order sent, so this program may not actually show the messages being delivered out of order. */ #include "amdc96.h" #include #define FETCHED 1 #define DATA 2 #define FETCH 3 typedef struct { int data1; int data2; } GlobalData; Location my_loc; LocName queue; Message fetch(LocName L); void doFetch(Message m, Location loc); void doStore(Message m, Location loc); void storeAt(Message m, LocName L); /* * This procedure is called by a process to * fetch a record. */ Message fetch(LocName L) { Message p; Destiny *d = (Destiny *)amdc_rawMsg(sizeof(Destiny)); *d = amdc_makeDest(amdc_getNameOfLoc(my_loc),FETCHED, (Script)amdc_rawScript); amdc_sendToAsDo(d,L,FETCH,(Script)doFetch); while ((p=amdc_getLoc(my_loc, FETCHED))==NULL) amdc_pollBlock(); return p; } /* * This is a script to fetch a record. */ void doFetch(Message m, Location loc) { Message p; Destiny d; LocName loc_name = amdc_getNameOfLoc(loc); memcpy(&d,m,sizeof(Destiny)); if ((p=amdc_getLoc(loc,DATA))!=NULL) { amdc_sendToDest(p,d); amdc_freeMsg(m); } else amdc_putLoc(loc,m); } /* * This is a script to store a record - it will execute * the scripts of waiting "fetch" messages immediately. */ void doStore(Message m, Location loc) { Message p; amdc_putLoc(loc,m); if ((p=amdc_getLoc(loc,FETCH)) != NULL) { (amdc_getScript(p))(p,loc); } } /* * This procedure is used by a process to store a * record at a location. */ void storeAt(Message m, LocName L) { amdc_sendToAsDo(m, L, DATA, (Script) doStore); } Main() { GlobalData *msg,*msg1,*msg2,*msg3; int i=0; my_loc=amdc_getMyLoc(); queue = amdc_0DLocName(AMDC_AbsoluteSymbol(0,AMDC_SYMB_HASH)); printf("I am node %d\n", amdc_getMyId()); if (amdc_getMyId() == 0) { /* generate some messages */ msg1 = amdc_newMsg((Script)amdc_rawScript, DATA, sizeof(GlobalData)); msg1->data1 = 1; msg1->data2 = 2; msg2 = amdc_newMsg((Script)amdc_rawScript, DATA, sizeof(GlobalData)); msg2->data1 = 3; msg2->data2 = 4; msg3 = amdc_newMsg((Script)amdc_rawScript, DATA, sizeof(GlobalData)); msg3->data1 = 5; msg3->data2 = 6; /* store the messages in the queue */ storeAt(msg1, queue); storeAt(msg2, queue); storeAt(msg3, queue); printf("Done node 0\n"); } else if (amdc_getMyId() == 1) { printf("\nI don't care if the data comes in order.\n"); while (i<3) { msg = (GlobalData*) fetch(queue); printf("Process %d received data: %d, %d\n", amdc_getMyId(), msg->data1, msg->data2); i++; } printf("Done node 1\n"); } else printf("\nThis sample program only supports 2 nodes!\n"); }