00001 // post.cc 00002 // Routines to deliver incoming network messages to the correct 00003 // "address" -- a mailbox, or a holding area for incoming messages. 00004 // This module operates just like the US postal service (in other 00005 // words, it works, but it's slow, and you can't really be sure if 00006 // your mail really got through!). 00007 // 00008 // Note that once we prepend the MailHdr to the outgoing message data, 00009 // the combination (MailHdr plus data) looks like "data" to the Network 00010 // device. 00011 // 00012 // The implementation synchronizes incoming messages with threads 00013 // waiting for those messages. 00014 // 00015 // Copyright (c) 1992-1993 The Regents of the University of California. 00016 // All rights reserved. See copyright.h for copyright notice and limitation 00017 // of liability and disclaimer of warranty provisions. 00018 00019 #include "copyright.h" 00020 #include "post.h" 00021 00022 extern "C" { 00023 int bcopy(char *, char *, int); 00024 }; 00025 00026 //---------------------------------------------------------------------- 00027 // Mail::Mail 00028 // Initialize a single mail message, by concatenating the headers to 00029 // the data. 00030 // 00031 // "pktH" -- source, destination machine ID's 00032 // "mailH" -- source, destination mailbox ID's 00033 // "data" -- payload data 00034 //---------------------------------------------------------------------- 00035 00036 Mail::Mail(PacketHeader pktH, MailHeader mailH, char *msgData) 00037 { 00038 ASSERT(mailH.length <= MaxMailSize); 00039 00040 pktHdr = pktH; 00041 mailHdr = mailH; 00042 bcopy(msgData, data, mailHdr.length); 00043 } 00044 00045 //---------------------------------------------------------------------- 00046 // MailBox::MailBox 00047 // Initialize a single mail box within the post office, so that it 00048 // can receive incoming messages. 00049 // 00050 // Just initialize a list of messages, representing the mailbox. 00051 //---------------------------------------------------------------------- 00052 00053 00054 MailBox::MailBox() 00055 { 00056 messages = new SynchList(); 00057 } 00058 00059 //---------------------------------------------------------------------- 00060 // MailBox::~MailBox 00061 // De-allocate a single mail box within the post office. 00062 // 00063 // Just delete the mailbox, and throw away all the queued messages 00064 // in the mailbox. 00065 //---------------------------------------------------------------------- 00066 00067 MailBox::~MailBox() 00068 { 00069 delete messages; 00070 } 00071 00072 //---------------------------------------------------------------------- 00073 // PrintHeader 00074 // Print the message header -- the destination machine ID and mailbox 00075 // #, source machine ID and mailbox #, and message length. 00076 // 00077 // "pktHdr" -- source, destination machine ID's 00078 // "mailHdr" -- source, destination mailbox ID's 00079 //---------------------------------------------------------------------- 00080 00081 static void 00082 PrintHeader(PacketHeader pktHdr, MailHeader mailHdr) 00083 { 00084 printf("From (%d, %d) to (%d, %d) bytes %d\n", 00085 pktHdr.from, mailHdr.from, pktHdr.to, mailHdr.to, mailHdr.length); 00086 } 00087 00088 //---------------------------------------------------------------------- 00089 // MailBox::Put 00090 // Add a message to the mailbox. If anyone is waiting for message 00091 // arrival, wake them up! 00092 // 00093 // We need to reconstruct the Mail message (by concatenating the headers 00094 // to the data), to simplify queueing the message on the SynchList. 00095 // 00096 // "pktHdr" -- source, destination machine ID's 00097 // "mailHdr" -- source, destination mailbox ID's 00098 // "data" -- payload message data 00099 //---------------------------------------------------------------------- 00100 00101 void 00102 MailBox::Put(PacketHeader pktHdr, MailHeader mailHdr, char *data) 00103 { 00104 Mail *mail = new Mail(pktHdr, mailHdr, data); 00105 00106 messages->Append((void *)mail); // put on the end of the list of 00107 // arrived messages, and wake up 00108 // any waiters 00109 } 00110 00111 //---------------------------------------------------------------------- 00112 // MailBox::Get 00113 // Get a message from a mailbox, parsing it into the packet header, 00114 // mailbox header, and data. 00115 // 00116 // The calling thread waits if there are no messages in the mailbox. 00117 // 00118 // "pktHdr" -- address to put: source, destination machine ID's 00119 // "mailHdr" -- address to put: source, destination mailbox ID's 00120 // "data" -- address to put: payload message data 00121 //---------------------------------------------------------------------- 00122 00123 void 00124 MailBox::Get(PacketHeader *pktHdr, MailHeader *mailHdr, char *data) 00125 { 00126 DEBUG('n', "Waiting for mail in mailbox\n"); 00127 Mail *mail = (Mail *) messages->Remove(); // remove message from list; 00128 // will wait if list is empty 00129 00130 *pktHdr = mail->pktHdr; 00131 *mailHdr = mail->mailHdr; 00132 if (DebugIsEnabled('n')) { 00133 printf("Got mail from mailbox: "); 00134 PrintHeader(*pktHdr, *mailHdr); 00135 } 00136 bcopy(mail->data, data, mail->mailHdr.length); 00137 // copy the message data into 00138 // the caller's buffer 00139 delete mail; // we've copied out the stuff we 00140 // need, we can now discard the message 00141 } 00142 00143 //---------------------------------------------------------------------- 00144 // PostalHelper, ReadAvail, WriteDone 00145 // Dummy functions because C++ can't indirectly invoke member functions 00146 // The first is forked as part of the "postal worker thread; the 00147 // later two are called by the network interrupt handler. 00148 // 00149 // "arg" -- pointer to the Post Office managing the Network 00150 //---------------------------------------------------------------------- 00151 00152 static void PostalHelper(int arg) 00153 { PostOffice* po = (PostOffice *) arg; po->PostalDelivery(); } 00154 static void ReadAvail(int arg) 00155 { PostOffice* po = (PostOffice *) arg; po->IncomingPacket(); } 00156 static void WriteDone(int arg) 00157 { PostOffice* po = (PostOffice *) arg; po->PacketSent(); } 00158 00159 //---------------------------------------------------------------------- 00160 // PostOffice::PostOffice 00161 // Initialize a post office as a collection of mailboxes. 00162 // Also initialize the network device, to allow post offices 00163 // on different machines to deliver messages to one another. 00164 // 00165 // We use a separate thread "the postal worker" to wait for messages 00166 // to arrive, and deliver them to the correct mailbox. Note that 00167 // delivering messages to the mailboxes can't be done directly 00168 // by the interrupt handlers, because it requires a Lock. 00169 // 00170 // "addr" is this machine's network ID 00171 // "reliability" is the probability that a network packet will 00172 // be delivered (e.g., reliability = 1 means the network never 00173 // drops any packets; reliability = 0 means the network never 00174 // delivers any packets) 00175 // "nBoxes" is the number of mail boxes in this Post Office 00176 //---------------------------------------------------------------------- 00177 00178 PostOffice::PostOffice(NetworkAddress addr, double reliability, int nBoxes) 00179 { 00180 // First, initialize the synchronization with the interrupt handlers 00181 messageAvailable = new Semaphore("message available", 0); 00182 messageSent = new Semaphore("message sent", 0); 00183 sendLock = new Lock("message send lock"); 00184 00185 // Second, initialize the mailboxes 00186 netAddr = addr; 00187 numBoxes = nBoxes; 00188 boxes = new MailBox[nBoxes]; 00189 00190 // Third, initialize the network; tell it which interrupt handlers to call 00191 network = new Network(addr, reliability, ReadAvail, WriteDone, (int) this); 00192 00193 00194 // Finally, create a thread whose sole job is to wait for incoming messages, 00195 // and put them in the right mailbox. 00196 Thread *t = new Thread("postal worker"); 00197 00198 t->Fork(PostalHelper, (int) this); 00199 } 00200 00201 //---------------------------------------------------------------------- 00202 // PostOffice::~PostOffice 00203 // De-allocate the post office data structures. 00204 //---------------------------------------------------------------------- 00205 00206 PostOffice::~PostOffice() 00207 { 00208 delete network; 00209 delete [] boxes; 00210 delete messageAvailable; 00211 delete messageSent; 00212 delete sendLock; 00213 } 00214 00215 //---------------------------------------------------------------------- 00216 // PostOffice::PostalDelivery 00217 // Wait for incoming messages, and put them in the right mailbox. 00218 // 00219 // Incoming messages have had the PacketHeader stripped off, 00220 // but the MailHeader is still tacked on the front of the data. 00221 //---------------------------------------------------------------------- 00222 00223 void 00224 PostOffice::PostalDelivery() 00225 { 00226 PacketHeader pktHdr; 00227 MailHeader mailHdr; 00228 char *buffer = new char[MaxPacketSize]; 00229 00230 for (;;) { 00231 // first, wait for a message 00232 messageAvailable->P(); 00233 pktHdr = network->Receive(buffer); 00234 00235 mailHdr = *(MailHeader *)buffer; 00236 if (DebugIsEnabled('n')) { 00237 printf("Putting mail into mailbox: "); 00238 PrintHeader(pktHdr, mailHdr); 00239 } 00240 00241 // check that arriving message is legal! 00242 ASSERT(0 <= mailHdr.to && mailHdr.to < numBoxes); 00243 ASSERT(mailHdr.length <= MaxMailSize); 00244 00245 // put into mailbox 00246 boxes[mailHdr.to].Put(pktHdr, mailHdr, buffer + sizeof(MailHeader)); 00247 } 00248 } 00249 00250 //---------------------------------------------------------------------- 00251 // PostOffice::Send 00252 // Concatenate the MailHeader to the front of the data, and pass 00253 // the result to the Network for delivery to the destination machine. 00254 // 00255 // Note that the MailHeader + data looks just like normal payload 00256 // data to the Network. 00257 // 00258 // "pktHdr" -- source, destination machine ID's 00259 // "mailHdr" -- source, destination mailbox ID's 00260 // "data" -- payload message data 00261 //---------------------------------------------------------------------- 00262 00263 void 00264 PostOffice::Send(PacketHeader pktHdr, MailHeader mailHdr, char* data) 00265 { 00266 char* buffer = new char[MaxPacketSize]; // space to hold concatenated 00267 // mailHdr + data 00268 00269 if (DebugIsEnabled('n')) { 00270 printf("Post send: "); 00271 PrintHeader(pktHdr, mailHdr); 00272 } 00273 ASSERT(mailHdr.length <= MaxMailSize); 00274 ASSERT(0 <= mailHdr.to && mailHdr.to < numBoxes); 00275 00276 // fill in pktHdr, for the Network layer 00277 pktHdr.from = netAddr; 00278 pktHdr.length = mailHdr.length + sizeof(MailHeader); 00279 00280 // concatenate MailHeader and data 00281 bcopy((char *) &mailHdr, buffer, sizeof(MailHeader)); 00282 bcopy(data, buffer + sizeof(MailHeader), mailHdr.length); 00283 00284 sendLock->Acquire(); // only one message can be sent 00285 // to the network at any one time 00286 network->Send(pktHdr, buffer); 00287 messageSent->P(); // wait for interrupt to tell us 00288 // ok to send the next message 00289 sendLock->Release(); 00290 00291 delete [] buffer; // we've sent the message, so 00292 // we can delete our buffer 00293 } 00294 00295 //---------------------------------------------------------------------- 00296 // PostOffice::Send 00297 // Retrieve a message from a specific box if one is available, 00298 // otherwise wait for a message to arrive in the box. 00299 // 00300 // Note that the MailHeader + data looks just like normal payload 00301 // data to the Network. 00302 // 00303 // 00304 // "box" -- mailbox ID in which to look for message 00305 // "pktHdr" -- address to put: source, destination machine ID's 00306 // "mailHdr" -- address to put: source, destination mailbox ID's 00307 // "data" -- address to put: payload message data 00308 //---------------------------------------------------------------------- 00309 00310 void 00311 PostOffice::Receive(int box, PacketHeader *pktHdr, 00312 MailHeader *mailHdr, char* data) 00313 { 00314 ASSERT((box >= 0) && (box < numBoxes)); 00315 00316 boxes[box].Get(pktHdr, mailHdr, data); 00317 ASSERT(mailHdr->length <= MaxMailSize); 00318 } 00319 00320 //---------------------------------------------------------------------- 00321 // PostOffice::IncomingPacket 00322 // Interrupt handler, called when a packet arrives from the network. 00323 // 00324 // Signal the PostalDelivery routine that it is time to get to work! 00325 //---------------------------------------------------------------------- 00326 00327 void 00328 PostOffice::IncomingPacket() 00329 { 00330 messageAvailable->V(); 00331 } 00332 00333 //---------------------------------------------------------------------- 00334 // PostOffice::PacketSent 00335 // Interrupt handler, called when the next packet can be put onto the 00336 // network. 00337 // 00338 // The name of this routine is a misnomer; if "reliability < 1", 00339 // the packet could have been dropped by the network, so it won't get 00340 // through. 00341 //---------------------------------------------------------------------- 00342 00343 void 00344 PostOffice::PacketSent() 00345 { 00346 messageSent->V(); 00347 } 00348
1.2.14 written by Dimitri van Heesch,
© 1997-2002