/******************************************************************* * Revised By: Qi (Jacky) Liu * * EMAIL: mailto://liuqi@sce.carleton.ca * * Revision Date: Sept. 6, 2005 *******************************************************************/ //-*-c++-*- #ifndef LTSFINPUTQUEUE_CC #define LTSFINPUTQUEUE_CC // Copyright (c) 1994-1996 Ohio Board of Regents and the University of // Cincinnati. All Rights Reserved. // BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY // FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT // PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, // EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE // PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME // THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. // IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING // WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR // REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR // DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL // DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM // (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED // INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF // THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH HOLDER // OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. // // $Id: LTSFInputQueue.cc,v 1.1.1.1 2007/03/15 15:45:05 rmadhoun Exp $ //--------------------------------------------------------------------------- #include "LTSFInputQueue.hh" #include "../../../JackyDebugStream.h" #ifdef JACKY_DEBUG #include #include "../../../message.h" #include "../../../pmessage.h" #endif LTSFInputQueue::LTSFInputQueue() { setFunc( BasicEventCompareRecv ); initialized = false; noOfEventsCommitted = 0; memoryUseage = 0; #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] //define variables to measure the performance positiveMsgNumber = 0; negativeMsgNumber = 0; rollbackNumber = 0; positiveStragglerMsgNumber = 0; negativeStragglerMsgNumber = 0; totalNumberOfEventsRolledBack = 0; #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] } void LTSFInputQueue::reset() { int i; initialized = false; listsize = 0; // initialize all listvalues head = NULL; tail = NULL; currentPos = NULL; insertPos = NULL; findPos = NULL; for (i = 0; i < numObjects; i++) { findObj[i] = insertObj[i] = currentObj[i] = NULL; headObj[i] = tailObj[i] = NULL; listSize[i] = 0; lVTArray[i] = ZERO; idleObjArray[i] = false; } } LTSFInputQueue::~LTSFInputQueue() { } #ifndef ONE_ANTI_MESSAGE //[2006-03-09] ------------------------------------------------------------------------- //Jacky Note: //this function is for NOT using "ONE_ANTI_MESSAGE" inline void LTSFInputQueue::miniListUnprocessMessages(BasicEvent* toInsert, int id) { #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "*** inputQ::miniListUnprocessMessage() called for localId = " << id << " / toInsert = {" << *toInsert << "}" << endl << flush; if(currentObj[id] != NULL){ jacky_os << "*** inputQ::miniListUnprocessMessage() -> currentObj[" << id << "] = {" << *(currentObj[id]) << "}" << endl << flush; } else { jacky_os << "*** inputQ::miniListUnprocessMessage() -> currentObj[" << id << "] = {NULL}" << endl << flush; } jacky_os << "*** inputQ::miniListUnprocessMessage() -> save currentPos to tmpCurrentPtr, currentPos = {"; if(currentPos != NULL){ jacky_os << (*currentPos) << "}" << endl << flush; } else { jacky_os << "NULL}" << endl << flush; } #endif //end JACKY_DEBUG tmpCurrentPtr = currentPos; // save the old pointers currentPos = currentObj[id]; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListUnprocessMessage() -> Now, set currentPos = currentObj[" << id << "]" << " = {"; if(currentPos != NULL){ jacky_os << (*currentPos) << "}" << endl << flush; } else { jacky_os << "NULL}" << endl << flush; } #endif //end JACKY_DEBUG if (currentPos != NULL) { //currentPos = currentObj[id] != NULL unprocessPtr = currentPos; // Set the item we inserted to true, and start unprocessing messages // (can't use currentPos because that points to the first message at // the same time instead of our message.) toInsert->alreadyProcessed = true; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListUnprocessMessage() -> set toInsert->alreadyProcessed = TRUE!!! toInsert = {" << (*toInsert) << "}" << endl << flush; #endif //end JACKY_DEBUG } else { ////currentPos = currentObj[id] = NULL unprocessPtr = NULL; } #ifdef JACKY_DEBUG if ( unprocessPtr != NULL ) { jacky_os << "*** inputQ::miniListUnprocessMessage() -> set unprocessPtr = {" << (*unprocessPtr) << "}" << endl << flush; } else { jacky_os << "*** inputQ::miniListUnprocessMessage() -> set unprocessPtr = {NULL}" << endl << flush; } #endif //end JACKY_DEBUG // unprocess all messages until we get a message that hasn't been // processed, or we reach the end of the queue. while(unprocessPtr != NULL && unprocessPtr->alreadyProcessed == true) { unprocessPtr->alreadyProcessed = false; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListUnprocessMessage() -> Unprocess {" << (*unprocessPtr) << "}" << endl << flush; #endif //end JACKY_DEBUG #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] //unprocess one event totalNumberOfEventsRolledBack++; #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] currentPos = currentPos->nextObj; if (currentPos == NULL) { unprocessPtr = NULL; } else { unprocessPtr = currentPos; } } //end while // reset alreadyProcessed flag in case the above didn't do it // (this should not be necessary, but I'll check later). toInsert->alreadyProcessed = false; // reset currentPos; currentPos = tmpCurrentPtr; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListUnprocessMessage() -> set toInsert->alreadyProcessed = FALSE!!! toInsert = {" << (*toInsert) << "}" << endl << flush; jacky_os << "*** inputQ::miniListUnprocessMessage() -> restore currentPos = tmpCurrentPtr = {"; if(currentPos != NULL){ jacky_os << (*currentPos) << "}" << endl << flush; } else { jacky_os << "NULL}" << endl << flush; } jacky_os << "********************* inputQ::miniListUnprocessMessage() END ***********************" << endl << flush; #endif //end JACKY_DEBUG } #else //[2006-03-09] This is for ONE_ANTI_MESSAGE ---------------------------------------------------------------------- //Jacky Note: //this function is for "ONE_ANTI_MESSAGE" optimization! //we need to do different things based on the "implodeProcessed" flag passed in: // 1. if implodeProcessed = true (i.e. the imploded [+] msg has been processed) // --> we need to do: // a> cancel all [+] msgs that were from the same sender and have sendTime >= [-]msg.sendTime // b> unprocess all other messages with recvTime >= [-]msg.recvTime // 2. if implodeProcessed = false (i.e. the imploded [+] msg has not yet been processed) // --> we only need to do message cancellation! No message unprocessing should be done in this case! inline void LTSFInputQueue::miniListUnprocessMessages(BasicEvent* toInsert, int id, bool implodeProcessed) { BasicEvent *deletePtr = NULL; #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "=================== [One-Anti] Unprocessing ==========================" << endl << flush; jacky_os << "*** inputQ::miniListUnprocessMessage() called for localId = " << id << " / toInsert = {" << *toInsert << "}" << endl << flush; if(currentObj[id] != NULL){ jacky_os << "*** inputQ::miniListUnprocessMessage() -> currentObj[" << id << "] = {" << *(currentObj[id]) << "}" << endl << flush; } else { jacky_os << "*** inputQ::miniListUnprocessMessage() -> currentObj[" << id << "] = {NULL}" << endl << flush; } jacky_os << "*** inputQ::miniListUnprocessMessage() -> save currentPos to tmpCurrentPtr, currentPos = {"; if(currentPos != NULL){ jacky_os << (*currentPos) << "}" << endl << flush; } else { jacky_os << "NULL}" << endl << flush; } #endif //end JACKY_DEBUG tmpCurrentPtr = currentPos; // save the old pointers currentPos = currentObj[id]; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListUnprocessMessage() -> Now, set currentPos = currentObj[" << id << "]" << " = {"; if(currentPos != NULL){ jacky_os << (*currentPos) << "}" << endl << flush; } else { jacky_os << "NULL}" << endl << flush; } #endif //end JACKY_DEBUG if (currentPos != NULL) { //currentPos = currentObj[id] != NULL unprocessPtr = currentPos; // Set the item we inserted to true, and start unprocessing messages // (can't use currentPos because that points to the first message at // the same time instead of our message.) toInsert->alreadyProcessed = true; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListUnprocessMessage() -> set toInsert->alreadyProcessed = TRUE!!! toInsert = {" << (*toInsert) << "}" << endl << flush; #endif //end JACKY_DEBUG } else { ////currentPos = currentObj[id] = NULL unprocessPtr = NULL; } #ifdef JACKY_DEBUG if ( unprocessPtr != NULL ) { jacky_os << "*** inputQ::miniListUnprocessMessage() -> set unprocessPtr = {" << (*unprocessPtr) << "}" << endl << flush; } else { jacky_os << "*** inputQ::miniListUnprocessMessage() -> set unprocessPtr = {NULL}" << endl << flush; } #endif //end JACKY_DEBUG //for ONE_ANTI_MESSAGE, we will get only one anti-message for each rollback. //So go ahead and delete all the messages corresponding to the object that sent the negative message. //[-] msg do: // -> a> msg cancellation (always do this!) // b> unprocessing iff implodeProcessed = true if ((toInsert->sign == NEGATIVE) && (unprocessPtr != NULL)) { //get the sender ID for this [-] message int negativeSourceId = toInsert->sender; //save the address of the BasicEvent* after which we need to do unprocessing BasicEvent* sanityPtr = unprocessPtr->prevObj; while (unprocessPtr != NULL) { deletePtr = unprocessPtr; unprocessPtr = unprocessPtr->nextObj; /* #ifdef JACKY_DEBUG jacky_os << endl << "deletePtr ->\t[" << *deletePtr << "]" << endl << flush; jacky_os << "set unprocessPtr ->\t[" << flush; if(unprocessPtr != NULL){ jacky_os << *unprocessPtr << "]" << endl << flush; } else { jacky_os << "NULL]!" << endl << flush; } #endif //end JACKY_DEBUG */ if ( (deletePtr->sender == negativeSourceId) #ifdef JACKY_ONE_ANTI_MESSAGE //[2006-03-06] //we need also check the sendTime of these msgs to be greater than or equal to //that of the [-] msg. That is, we delete all messages coming from the same sender //and having equal or greater sendTime. && (toInsert->sendTime <= deletePtr->sendTime) #endif //end JACKY_ONE_ANTI_MESSAGE [2006-03-06] ) { //1. from same sender with sendTime >= toInsert->sendTime ==> cancel event //for all BasicEvents from the same sender after the implosion, destroy //the BasicEvents on the inputQ after the implosion! //since no more [-] msg will come from that sender! if (tmpCurrentPtr == deletePtr) { tmpCurrentPtr = tmpCurrentPtr->next; #ifdef JACKY_DEBUG jacky_os << endl << "[One-Anti] tmpCurrentPtr Cancelled!!! -> reset tmpCurrentPtr = [" << flush; if(tmpCurrentPtr != NULL){ jacky_os << *tmpCurrentPtr << "]" << endl << flush; } else { jacky_os << "NULL]!" << endl << flush; } #endif //end JACKY_DEBUG } #ifdef LPDEBUG *lpFile << "OAM-Scavenged : " << *deletePtr << endl; #endif //end LPDEBUG #ifdef JACKY_DEBUG jacky_os << endl << "\t!!![One-Anti]!!! ->" << " !!! Delete event:" << " [" << *deletePtr << "]" << endl << endl << flush; #endif //end JACKY_DEBUG #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] if(deletePtr->alreadyProcessed == true){ totalNumberOfEventsRolledBack++; } #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] delete [] (char *)MultiList::remove(deletePtr, id); } else { //2. not from same sender OR sendTime < toInsert->sendTime //do unprocessing iff implodeProcessed = true if( (implodeProcessed == true) && (deletePtr->alreadyProcessed == true) ){ deletePtr->alreadyProcessed = false; #ifdef JACKY_DEBUG jacky_os << "[One-Anti]-> implodeProcessed == true, Unprocess {" << *deletePtr << "}" << endl << flush; #endif //end JACKY_DEBUG #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] totalNumberOfEventsRolledBack++; #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] } //end if }//end msg cancellation & unprocessing } //end while unprocessPtr != NULL //now, unprocessPtr is NULL! if (sanityPtr != NULL) { currentObj[id] = sanityPtr->nextObj; #ifdef JACKY_DEBUG jacky_os << endl << "\t===[One-Anti]=== LTSFInputQ::miniListUnprocessMessages() ->" << " sanityPtr != NULL, set currentObj[" << id << "] = " << flush; if(currentObj[id] != NULL) { jacky_os << *(currentObj[id]) << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } #endif //end JACKY_DEBUG } else { currentObj[id] = headObj[id]; #ifdef JACKY_DEBUG jacky_os << "\t===[One-Anti]=== LTSFInputQ::miniListUnprocessMessages() ->" << " sanityPtr == NULL, set currentObj[" << id << "] = headObj[" << id << "] = " << flush; if(headObj[id] != NULL) { jacky_os << *(headObj[id]) << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } #endif //end JACKY_DEBUG } //end reset currentObj[id] to the 1st unprocessed event or NULL if no unprocessed event left #ifdef LPDEBUG *lpFile << "OAM-Scavenged " << scavengeCount << endl; *lpFile << "OAM-Reset " << resetNumber << " events" << endl; #endif //end LPDEBUG } //end if (toInsert->sign == [-] && unprocessPtr != NULL) else { //toInsert->sign == [+] OR unprocessPtr == NULL //for [+] msg, do normal unprocessing; while(unprocessPtr != NULL && unprocessPtr->alreadyProcessed == true) { //Jacky Note: //this while loop is for [+] msg && unprocessPtr != NULL unprocessPtr->alreadyProcessed = false; #ifdef JACKY_DEBUG jacky_os << "\t===[One-Anti]=== inputQ::miniListUnprocessMessage() !! [+] msg !! -> Unprocess {" << (*unprocessPtr) << "}" << endl << flush; #endif //end JACKY_DEBUG #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] totalNumberOfEventsRolledBack++; #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] currentPos = currentPos->nextObj; if (currentPos == NULL) { unprocessPtr = NULL; } else { unprocessPtr = currentPos; } } //end while } //end [+]/[-] msg //Jacky Note: Both [-] & [+] msg will do the following! #ifdef JACKY_DEBUG jacky_os << endl << "===[One-Anti]=== -> before reset toInsert->processed=F && restore currentPos = tmpCurrentPtr" << endl << flush; #endif // reset alreadyProcessed flag in case the above didn't do it // (this should not be necessary, but I'll check later). toInsert->alreadyProcessed = false; // reset currentPos; currentPos = tmpCurrentPtr; #ifdef JACKY_DEBUG jacky_os << "\t===[One-Anti]=== inputQ::miniListUnprocessMessage() -> " << "set toInsert->alreadyProcessed = FALSE!!!" << endl << " toInsert = {" << (*toInsert) << "}" << endl << flush; jacky_os << endl << "\t===[One-Anti]=== inputQ::miniListUnprocessMessage() -> restore currentPos = {"; if(currentPos != NULL){ jacky_os << (*currentPos) << "}" << endl << flush; } else { jacky_os << "NULL}" << endl << flush; } //print the miniList for id BasicEvent* inputQ_cursor; jacky_os << "\t===[One-Anti]=== miniList for localId = " << id << " ===========" << endl << flush; if(headObj[id] != NULL){ inputQ_cursor = (BasicEvent*)headObj[id]; TWMessage* temp = dynamic_cast(inputQ_cursor); while(inputQ_cursor != NULL){ jacky_os << "<" << " id=" << inputQ_cursor->eventId << " sign=" << inputQ_cursor->sign << " " << inputQ_cursor->sender << "@" << inputQ_cursor->sendTime << " => " << inputQ_cursor->dest << "@" << inputQ_cursor->recvTime << " Processed?" << inputQ_cursor->alreadyProcessed << flush; #ifdef JACKY_EVENT_JUMP if(temp->derivedFromExternalEvent() == true){ jacky_os << " [Event!]" << flush; } #endif jacky_os << " >" << endl << flush; inputQ_cursor = inputQ_cursor->nextObj; } jacky_os << "--------------------- printing END ------------------------------------- " << endl << flush; } else { jacky_os << "\t===[One-Anti]=== LTSFInputQueue::miniListUnprocessMessages() " << " There are 0 elements in the miniList for localId " << id << "! ========" << endl << flush; } jacky_os << "\t===[One-Anti]=== ****** inputQ::miniListUnprocessMessage() END ******" << endl << endl << flush; #endif //end JACKY_DEBUG } #endif //end ONE_ANTI_MESSAGE [2006-03-09] ----------------------------------------------------------------------------- #ifndef JACKY_REVISION inline bool LTSFInputQueue::miniListRollback(BasicEvent *toInsert, int id) { #else //Jacky: implodeProcessed is TRUE only if we got an anti-message, and this anti-message imploded its // (+) counterpart that has already been processed // Note: if the (+) msg has NOT been processed, receiving the anti-message just trigger an implosion // But, if the (+) msg has already been processed, after the implosion, we have to rollback to // the Last (+) msg just before the (+) msg that has been imploded! inline bool LTSFInputQueue::miniListRollback(BasicEvent *toInsert, int id, bool implodeProcessed) { #endif //end JACKY_REVISION bool inMiniListPast = false; // check to see if message is in minilist's past. In most cases lVTArray // will have the correct time of the last message executed, so this will // be used to determine rollbacks. There are some cases where this time // will be incorrect. This is for a negative message cancelling the last // message in the miniqueue[*] In this case the recvTime of the antimessage // will be taken as the last time executed. This can cause another rollback // which will be unnecessary if another message is received with a // a timestamp less than the antimessage and greater than the state // restored, but processing will still be correct. // [*] - the miniqueue is an abstraction of the multiqueue where each // destination in the multiqueue has a miniqueue that contains just // the messages for that destination. All pointers that end with "Obj" // are referring to the miniqueue. // !!!!!! 23/01/2001 Originalmente: toInsert->recvTime <= lVTArray[id] // Ahora cambia el <= por el < para poder recibir mensajes simultaneos. #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "\t*** inputQ::miniListRollback() called for localId = " << id << " toInsert = {" << *toInsert << "}" << endl << flush; #endif #ifndef JACKY_REVISION if ( toInsert->recvTime < lVTArray[id] ) { #else //Jacky's revision if ( (toInsert->recvTime < lVTArray[id]) || (implodeProcessed && (toInsert->recvTime <= lVTArray[id]) ) ) { #endif #ifdef JACKY_DEBUG if(implodeProcessed == false){ jacky_os << "\t*** inputQ::miniListRollback() -> toInsert->recvTime < lVTArray[" << id << "](" << lVTArray[id].asString() << ")" << endl << flush; } else { jacky_os << "\t*** inputQ::miniListRollback() -> implodeProcessed = TRUE && " << "toInsert->recvTime <= lVTArray[" << id << "](" << lVTArray[id].asString() << ")" << endl << flush; } #endif if (toInsert->sign == POSITIVE) { //toInsert->sign == POSITIVE 1{ #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListRollback() -> (+) msg call MultiList::find(==)" << endl << flush; #endif MultiList::find(toInsert, EQUAL, id); currentObj[id] = findPos; #ifdef JACKY_DEBUG if(findPos != NULL){ jacky_os << "*** inputQ::miniListRollback() -> set currentObj["<< id << "] = " << (*findPos) << endl << flush; } else { jacky_os << "*** inputQ::miniListRollback() -> find(==) get NULL, set currentObj[" << id << "] = NULL" << endl << flush; } #endif } else { //toInsert->sign == NEGATIVE 1} 2{ #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListRollback() -> (-) msg call MultiList::find(>=)" << endl << flush; #endif MultiList::find(toInsert, GREATEREQUAL, id); currentObj[id] = findPos; #ifdef JACKY_DEBUG if(findPos != NULL){ jacky_os << "*** inputQ::miniListRollback() -> set currentObj["<< id << "] = " << (*findPos) << endl << flush; } else { jacky_os << "*** inputQ::miniListRollback() -> find(>=) get NULL, set currentObj[" << id << "] = NULL" << endl << flush; } #endif } //end NEGATIVE 2} // Now we need to unprocess some messages from the miniqueue that is being rolledback. // This is called in the following scenarios: // A> one-anti-message is NOT used: // 1. toInsert is [+] && toInsert->recvTime < lVTArray[id] OR // 2. toInsert is [-] && an already processed [+] is imploded && toInsert->recvTime <= lVTArray[id] // B> using one-anti-message: // 1. if toInsert is [+] && toInsert->recvTime < lVTArray[id] // => do normal unprocessing just like one-anti-message is not used; // 2. if toInsert is [-] && an already processed [+] is imploded && toInsert->recvTime <= lVTArray[id] // => do unprocessing and at the same time cancel all [+] messages from the same sender with // sendTime >= [-]msg.sendTime // This is the case when the 1st round of [-] is received! #ifndef ONE_ANTI_MESSAGE //[2006-03-09] ------------------------------------------------------------------------- miniListUnprocessMessages(toInsert, id); #else //[2006-03-09] This is for ONE_ANTI_MESSAGE --------------------------------------------------------------- miniListUnprocessMessages( toInsert, id, implodeProcessed ); #endif //end ONE_ANTI_MESSAGE [2006-03-09] ---------------------------------------------------------------------- // This is the state that is going to be restored, so lets reset // our lVT value to it since any future message should not cause // a rollback except if it's before this time. if (currentObj[id] != NULL) { if (currentObj[id]->prevObj != NULL) { lVTArray[id] = currentObj[id]->prevObj->recvTime; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListRollback() -> set lVTArray["<< id << "] = " << (currentObj[id]->prevObj->recvTime).asString() << endl << flush; #endif } else { lVTArray[id] = currentObj[id]->recvTime; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListRollback() -> set lVTArray["<< id << "] = " << (currentObj[id]->recvTime).asString() << endl << flush; #endif } } else { //currentObj[id] == NULL // currentObj is NULL so lets see if we have any processed messages // in our list, and if so then let's use them to determine the time // otherwise use the recieve time of the message. if (tailObj[id] == NULL) { // This is the time we are going to rollback to even if this is // a negative message. lVTArray[id] = lastGVT; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListRollback() -> set lVTArray["<< id << "] = lastGVT = " << lVTArray[id].asString() << endl << flush; #endif } else { //tailObj[id] != NULL if (tailObj[id]->alreadyProcessed == true) { lVTArray[id] = tailObj[id]->recvTime; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListRollback() -> set lVTArray[" << id << "] = tailObj[id]->recvTime = " << (lVTArray[id]).asString() << endl << flush; #endif } else { lVTArray[id] = lastGVT; #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListRollback() -> set lVTArray[" << id << "] = lastGVT = " << lVTArray[id].asString() << endl << flush; #endif } } //end tailObj[id] == NULL } //end currentObj[id] != NULL #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListRollback() -> inThePast = TRUE !!!! rollback will follow!!!" << endl << flush; #endif inMiniListPast = true; } else { //([+] && toInsert->recvTime >= lVTArray[id]) || ([-] && toInsert->recvTime > lVTArray[id]) #ifdef JACKY_DEBUG jacky_os << "\t*** inputQ::miniListRollback() -> toInsert->recvTime >= lVTArray[" << id << "](" << lVTArray[id].asString() << ")" << endl << flush; #endif #ifndef JACKY_REVISION // we had no events and now we have one, or we have a new one // before currentObj, so reset currentObj to point to the new mesage // for positive messages, or to any message at or after the message // for a negative message. if ( currentObj[id] == NULL || currentObj[id] != NULL && (toInsert->recvTime < currentObj[id]->recvTime || (toInsert->recvTime == currentObj[id]->recvTime && // "<=" has been set due to reasons explained in insert() toInsert->dest <= currentObj[id]->dest) ) ) { #else //Jacky's revision (using <) //Jacky: since events are inserted into the inputQ by arrival order among //EQUAL events, I think we should use < rather than <= here!!! if ( currentObj[id] == NULL || currentObj[id] != NULL && (toInsert->recvTime < currentObj[id]->recvTime || (toInsert->recvTime == currentObj[id]->recvTime && toInsert->dest < currentObj[id]->dest) ) ) { #endif //end JACKY_REVISION if (toInsert->sign == POSITIVE) { currentObj[id] = insertObj[id]; #ifdef JACKY_DEBUG if(currentObj[id] != NULL){ jacky_os << "\t*** inputQ::miniListRollback() -> " << "(+) msg set currentObj[id] = insertObj[id] = " << (*currentObj[id]) << endl << flush; } else { jacky_os << "\t*** inputQ::miniListRollback() -> " << "(+) msg set currentObj[id] = insertObj[id] = NULL!" << endl << flush; } #endif } else { //toInsert->sign == [-] #ifdef JACKY_DEBUG jacky_os << "*** inputQ::miniListRollback() -> (-) msg, call find(>=) to set currentObj[" << id << "]" << endl << flush; #endif MultiList::find(toInsert, GREATEREQUAL, id); currentObj[id] = findPos; #ifdef JACKY_DEBUG if(findPos != NULL){ jacky_os << "*** inputQ::miniListRollback() -> (-) msg, " << "call MultiList::find(>=), set" << "currentObj[id] = findPos = " << (*findPos) << endl << flush; } else { jacky_os << "*** inputQ::miniListRollback() -> (-) msg, " << "call MultiList::find(>=), set" << "currentObj[id] = findPos = NULL!" << endl << flush; } #endif #ifdef ONE_ANTI_MESSAGE // -------------------------------------------------------------------------------------- //[2006-03-06] //Jacky Note: //This is necessary for the following scenario: //The receiver has rolled back to a previous time (e.g. FC has rolled back to time 000) //Then more [-] msg arrive at the receiver, but after implosion, the [-] msg has a //recvTime > the restored time at the receiver (e.g. cell 3 sends [-] msg with a recvTime //of 1000), the receiver needs to delete all messages from the sender with sendTime >= //[-]msg.sendTime! //E.g. the FC needs to delete all msgs from cell 3 with sendTime >= 1000! //This is called when the [-] does not cause a rollback on the receiver! #ifdef JACKY_DEBUG jacky_os << endl << "\t ===[One-Anti]=== No RB, after adjust currentObj[id]... " << " call miniListUnprocessMessages() again!" << endl << endl << flush; #endif miniListUnprocessMessages( toInsert, id, implodeProcessed ); #endif //end ONE_ANTI_MESSAGE ---------------------------------------------------------------------------------- } //end toInsert->sign == [-] } //end adjust currentObj[id] #ifdef ONE_ANTI_MESSAGE // -------------------------------------------------------------------------------------- //[2006-03-06] //Jacky Note: //After the 1st round of implosion, the lVTArray[id] has been adjusted so that [-] msgs arrived later //only do message implosion, no futher unprocessing will be done. However, when using one-anti-message, //we need the following [-] msgs also do message cancellation on the receiver's inputQ to cancel those //[+] msgs from the same sender and with sendTime >= the sendTime of the [-] msg! else { //if we do not need to adjust currentObj[id] // cout << "The special case we found just now" << endl; if(toInsert->sign == NEGATIVE){ //[2006-03-06] //Jacky Note: //we need to restore currentObj[id] to the 1st unprocessed event or NULL if there //is no unprocessed event on the InputQueue #ifndef JACKY_ONE_ANTI_MESSAGE //[2006-03-06] //Jacky Note: //this is the original version to save currentObj[id] before doing unprocessing //then restore it after unprocessing. However, if all unprocessed events have been //cancelled during unprocessing, the currentObj[id] will be restored to an already //nonexistant event! //So we need to save the event that is the LAST processed one, if it exists. //after unprocessing, we restore currentObj[id] to the event after this saved one. //It will be the 1st unprocessed one or NULL if no unprocessed event exists on the //input queue BasicEvent *oldCurrentPos = currentObj[id]; #else //[2006-03-06] ============================================================================================= //Save the LAST processed event! (rather than the 1st unprocessed event since this //may be cancelled during unprocessing!) BasicEvent *oldCurrentPos = currentObj[id]->prevObj; #ifdef JACKY_DEBUG jacky_os << endl << "\t ===[One-Anti]=== save currentObj[" << id << "]->prevObj = " << "oldCurrentPos = " << flush; if(oldCurrentPos != NULL) { jacky_os << *(oldCurrentPos) << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } #endif #endif //end JACKY_ONE_ANTI_MESSAGE [2006-03-06] MultiList::find(toInsert, GREATEREQUAL, id); currentObj[id] = findPos; #ifdef JACKY_DEBUG jacky_os << endl << "\t ===[One-Anti]=== find (>=) currentObj[" << id << "] = " << flush; if(currentObj[id] != NULL) { jacky_os << *(currentObj[id]) << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } jacky_os << endl << "\t ===[One-Anti]=== No RB, No adjust currentObj[" << id << "]! " << " call miniListUnprocessMessages() again!" << endl << endl << flush; #endif //this is called in following rounds of [-] messages when rollback is not necessary miniListUnprocessMessages( toInsert, id, implodeProcessed ); #ifdef JACKY_DEBUG jacky_os << endl << "\t ===[One-Anti]=== after unprocess() currentObj[" << id << "] = " << flush; if(currentObj[id] != NULL) { jacky_os << *(currentObj[id]) << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } #endif #ifndef JACKY_ONE_ANTI_MESSAGE //[2006-03-06] //restore currentObj[id] to the previously saved one //in this version, there is a problem that the oldCurrentPos may have been calcelled //during unprocessing! currentObj[id] = oldCurrentPos; #else //[2006-03-06] ============================================================================================= //restore currentObj[id] to the 1st unprocessed event if it exists; otherwise, //currentObj[id] should be NULL! if( oldCurrentPos != NULL ){ //restore to the event after the LAST processed one currentObj[id] = oldCurrentPos->nextObj; //currentObj[id] is now the 1st unprocessed event or NULL if no unprocessed //event exists on the inputQ #ifdef JACKY_DEBUG jacky_os << endl << "\t ===[One-Anti]=== set currentObj[" << id << "] = " << " oldCurrentPos->nextObj = " << flush; if(currentObj[id] != NULL) { jacky_os << *(currentObj[id]) << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } jacky_os << endl << flush; #endif } else { //oldCurrentPos == NULL, i.e. there is no processed event! //No processed event, currentObj[id] should be the 1st unprocessed event currentObj[id] = headObj[id]; #ifdef JACKY_DEBUG jacky_os << endl << "\t ===[One-Anti]=== set currentObj[" << id << "] = " << " headObj[id] = " << flush; if(headObj[id] != NULL) { jacky_os << *(headObj[id]) << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } jacky_os << endl << flush; #endif } #endif //end JACKY_ONE_ANTI_MESSAGE [2006-03-06] } } #endif //end ONE_ANTI_MESSAGE ---------------------------------------------------------------------------------- } //end ([+] && toInsert->recvTime >= lVTArray[id]) || ([-] && toInsert->recvTime > lVTArray[id]) #ifdef JACKY_DEBUG jacky_os << "\t\t*** inputQ::miniListRollback() END -> inThePast = " << inMiniListPast <<" ***" << endl << flush; #endif return inMiniListPast; } VTime LTSFInputQueue::getLVTForThisObj(int id){ BasicEvent* unProcessMsg; unProcessMsg = headObj[id]; if(unProcessMsg->alreadyProcessed == false){ return(unProcessMsg->recvTime); } else{ while(unProcessMsg->alreadyProcessed == true){ unProcessMsg = unProcessMsg->nextObj; } return(unProcessMsg->prevObj->recvTime); } } #ifdef JACKY_RB_EXCEPTION // Nov. 5, 2005 ==================================================== /* Function to handle rollback exceptional situations, where we need to remove a straggler ** event from the inputQ. This is for the following situation: ** The NC sent the 1st (X) message in the NCBag to the FC and triggered rollbacks on that ** machine. This straggler (X) message (event) should be removed form the inputQ. ** We use the similar logic for message implosion as in the insert function. The difference ** is that the implosion is triggered by the straggler position message instead of a negative ** one. */ void LTSFInputQueue::removeStragglerEvent(BasicEvent *toRemove, int id){ register BasicEvent *objectPtr; bool msgCancel = false; #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "LTSFInputQueue::removeStragglerEvent() -> toRemove = " << (*toRemove) << endl << flush; //let's print out the current inputQ BasicEvent* cursor; //jacky_os << "----REMOVE STRAGGLER------- Current inputQ -----------------------" << endl << flush; //cursor = (BasicEvent*)head; //while(cursor != NULL){ // jacky_os << "<" // << " id=" << cursor->eventId // << " sign=" << cursor->sign // << " " << cursor->sender << "@" << cursor->sendTime // << " => " << cursor->dest << "@" << cursor->recvTime // << " alreadyProcessed?" << cursor->alreadyProcessed << " >" << endl << flush; // cursor = cursor->next; //} //jacky_os << endl << flush; //jacky_os << "----removeStragglerEvent()-- miniList pointers for localId = " << id << " ------------" // << endl << flush; //if(headObj[id] != NULL){ // jacky_os << "\t headObj[" << id << "] = {" << *(headObj[id]) << "}" << endl << flush; //} else { // jacky_os << "\t headObj[" << id << "] = NULL" << endl << flush; //} //if(tailObj[id] != NULL){ // jacky_os << "\t tailObj[" << id << "] = {" << *(tailObj[id]) << "}" << endl << flush; //} else { // jacky_os << "\t tailObj[" << id << "] = NULL" << endl << flush; //} //if(findObj[id] != NULL){ // jacky_os << "\t findObj[" << id << "] = {" << *(findObj[id]) << "}" << endl << flush; //} else { // jacky_os << "\t findObj[" << id << "] = NULL" << endl << flush; //} //jacky_os << endl << flush; jacky_os << "----LTSFInputQueue::removeStragglerEvent()-- BEFORE: miniList for localId = " << id << " -------------" << endl << flush; if(headObj[id] != NULL){ cursor = (BasicEvent*)headObj[id]; TWMessage* temp = dynamic_cast(cursor); while(cursor != NULL){ jacky_os << "<" << " id=" << cursor->eventId << " sign=" << cursor->sign << " " << cursor->sender << "@" << cursor->sendTime << " => " << cursor->dest << "@" << cursor->recvTime << " Processed?" << cursor->alreadyProcessed << flush; #ifdef JACKY_EVENT_JUMP //~~~~~~~~~~~~~~~~~ Nov. 22, 2005 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if(temp->derivedFromExternalEvent() == true){ jacky_os << " [Event!]" << flush; } #endif //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ jacky_os << " >" << endl << flush; cursor = cursor->nextObj; } jacky_os << "--------------------- printing END ------------------------------------- " << endl << flush; } else { jacky_os << "----LTSFInputQueue::removeStragglerEvent()--" << " There are 0 elements in the miniList for localId " << id << "! -----------" << endl << flush; } #endif // look for first element with same time objectPtr = MultiList::find(toRemove, EQUAL, id); // walk through events with same time looking for implosion or duplicate while (msgCancel == false && objectPtr != NULL) { if ((objectPtr->eventId == toRemove->eventId) && (objectPtr->sender == toRemove->sender)) { // removal if (objectPtr->sign == toRemove->sign) { // delete the (+) msg and release memory delete [] ((char *) MultiList::removeFind(id)); msgCancel = true; } else { // strange! cout << "Error: Try to remove the straggler, but sign is different -aborting" << endl; abort(); } } // Multilist moves currObj and insertObj if they point to the last // element and it's deleted. This screws up currObj[id], so this // line checks and fixes it if necessary. if (currentObj[id] != NULL && currentObj[id] == tailObj[id] && currentObj[id]->alreadyProcessed == true) { currentObj[id] = NULL; } objectPtr = MultiList< BasicEvent >::findNext(id); } // while if (msgCancel == false) { cerr << "LTSFInputQ:removeStragglerEvent() failed!" << endl; cerr << "toRemove: " << toRemove << " *toRemove: " << *toRemove << endl; print(); cerr << "LTSFInputQ printed - the process will abort" << endl; abort(); } // readjust the currentPos if (currentPos == NULL) { currentPos = currentObj[id]; } else { if (currentObj[id] != NULL && (currentObj[id]->recvTime < currentPos->recvTime || (currentObj[id]->recvTime == currentPos->recvTime && currentObj[id]->dest < currentPos->dest)) ) { currentPos = currentObj[id]; } } //end if currentPos == NULL #ifdef JACKY_DEBUG jacky_os << endl << "----LTSFInputQueue::removeStragglerEvent() -> AFTER REMOVAL ------------------" << endl << flush; //let's print out the current inputQ //jacky_os << "----AFTER REMOVAL------- Current inputQ -----------------------" << endl << flush; //cursor = (BasicEvent*)head; //while(cursor != NULL){ // jacky_os << "<" // << " id=" << cursor->eventId // << " sign=" << cursor->sign // << " " << cursor->sender << "@" << cursor->sendTime // << " => " << cursor->dest << "@" << cursor->recvTime // << " alreadyProcessed?" << cursor->alreadyProcessed << " >" << endl << flush; // cursor = cursor->next; //} //jacky_os << endl << flush; //jacky_os << "----AFTER REMOVAL-- miniList pointers for localId = " << id << " --------------" // << endl << flush; //if(headObj[id] != NULL){ // jacky_os << "\t headObj[" << id << "] = {" << *(headObj[id]) << "}" << endl << flush; //} else { // jacky_os << "\t headObj[" << id << "] = NULL" << endl << flush; //} //if(tailObj[id] != NULL){ // jacky_os << "\t tailObj[" << id << "] = {" << *(tailObj[id]) << "}" << endl << flush; //} else { // jacky_os << "\t tailObj[" << id << "] = NULL" << endl << flush; //} //if(findObj[id] != NULL){ // jacky_os << "\t findObj[" << id << "] = {" << *(findObj[id]) << "}" << endl << flush; //} else { // jacky_os << "\t findObj[" << id << "] = NULL" << endl << flush; //} //jacky_os << endl << flush; jacky_os << "-------- AFTER REMOVAL-- miniList for localId = " << id << " ----------------------" << endl << flush; if(headObj[id] != NULL){ cursor = (BasicEvent*)headObj[id]; TWMessage* temp = dynamic_cast(cursor); while(cursor != NULL){ jacky_os << "<" << " id=" << cursor->eventId << " sign=" << cursor->sign << " " << cursor->sender << "@" << cursor->sendTime << " => " << cursor->dest << "@" << cursor->recvTime << " Processed?" << cursor->alreadyProcessed << flush; #ifdef JACKY_EVENT_JUMP //~~~~~~~~~~~~~~~~~ Nov. 22, 2005 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if(temp->derivedFromExternalEvent() == true){ jacky_os << " [Event!]" << flush; } #endif //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ jacky_os << " >" << endl << flush; cursor = cursor->nextObj; } jacky_os << "--------------------- printing END ------------------------------------- " << endl << flush; } else { jacky_os << "----AFTER REMOVAL-- There are 0 elements in the miniList for localId " << id << "! -----------" << endl << flush; } #endif } #endif //end JACKY_RB_EXCEPTION ================================================================== #ifdef JACKY_NC_BP_STATE //[2006-02-02] ---------------------------------------------------------- //function to recover from the "breakpoint" state on the NC's stateQ //1. unprcess the BasicEvent passed in, which is the inputPos of the breakpoint state //2. make this unprocessed Basicevent the next event to be executed void LTSFInputQueue::reExecuteImmediately(BasicEvent *inputPos, int localId){ #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "LTSFInputQueue::reExecuteImmediately() Called!!!" << endl << flush; jacky_os << "LTSFInputQueue::reExecuteImmediately() -> localId = " << localId << " // toExecute = " << (*inputPos) << endl << flush; #endif if( inputPos == NULL ){ cerr << "LTSFInputQueue::reExecuteImmediately() get a NULL inputPos!" << endl; abort(); } //1. unprocess the event if( inputPos->alreadyProcessed == true ){ inputPos->alreadyProcessed = false; #ifdef JACKY_DEBUG jacky_os << endl << "2006-02-02 LTSFInputQueue::reExecuteImmediately() -> inputPos unprocessed!" << endl << flush; if( currentObj[localId] != NULL ){ jacky_os << "1. Current currentObj[" << localId << "] = " << currentObj[localId] << endl << flush; jacky_os << "\t1. ->[" << *(currentObj[localId]) << endl << flush; } else { jacky_os << "1. Current currentObj[" << localId << "] = NULL!" << endl << flush; } jacky_os << endl << flush; if( currentPos != NULL ){ jacky_os << "1. Current currentPos = " << currentPos << endl << flush; jacky_os << "\t1. ->[" << *(currentPos) << endl << flush; } else { jacky_os << "1. Current currentPos = NULL!" << endl << flush; } #endif } //2. adjust the pointers in the NC's inputQ so that this unprocessed event // will be selected by the scheduler and be executed immediately after the rollbacks //a) update currentPos[localId] if ( (currentObj[localId] == NULL) || (currentObj[localId] != NULL && inputPos->recvTime < currentObj[localId]->recvTime) ) { currentObj[localId] = inputPos; #ifdef JACKY_DEBUG jacky_os << "2006-02-02 LTSFInputQueue::reExecuteImmediately() -> reset currentObj[localId]!" << endl << flush; #endif } //end a) update currentObj[localId] #ifdef JACKY_DEBUG if( currentObj[localId] != NULL ){ jacky_os << "2. Current currentObj[" << localId << "] = " << currentObj[localId] << endl << flush; jacky_os << "\t2. ->[" << *(currentObj[localId]) << endl << flush; } else { jacky_os << "2. Current currentObj[" << localId << "] = NULL!" << endl << flush; } jacky_os << endl << flush; if( currentPos != NULL ){ jacky_os << "2. Current currentPos = " << currentPos << endl << flush; jacky_os << "\t2. ->[" << *(currentPos) << endl << flush; } else { jacky_os << "2. Current currentPos = NULL!" << endl << flush; } #endif //b) update currentPos if (currentPos == NULL) { currentPos = currentObj[localId]; } else if ( currentPos != NULL && ( (currentObj[localId]->recvTime < currentPos->recvTime) || (currentObj[localId]->recvTime == currentPos->recvTime && currentObj[localId]->dest < currentPos->dest) ) ) { //set currentPos to currentObj[ncLocalId] currentPos = currentObj[localId]; #ifdef JACKY_DEBUG jacky_os << "2006-02-02 LTSFInputQueue::reExecuteImmediately() -> reset currentPos!" << endl << flush; #endif } //end b) update currentPos #ifdef JACKY_DEBUG if( currentObj[localId] != NULL ){ jacky_os << "3. Current currentObj[" << localId << "] = " << currentObj[localId] << endl << flush; jacky_os << "\t3. ->[" << *(currentObj[localId]) << endl << flush; } else { jacky_os << "3. Current currentObj[" << localId << "] = NULL!" << endl << flush; } jacky_os << endl << flush; if( currentPos != NULL ){ jacky_os << "3. Current currentPos = " << currentPos << endl << flush; jacky_os << "\t3. ->[" << *(currentPos) << endl << flush; } else { jacky_os << "3. Current currentPos = NULL!" << endl << flush; } #endif } #endif //end JACKY_NC_BP_STATE [2006-02-02] ------------------------------------------------------- bool LTSFInputQueue::insert(BasicEvent *toInsert, int id) { register BasicEvent *objectPtr; bool msgCancel = false; bool inThePast = false; #ifdef JACKY_REVISION //Jacky: this flag is used to record whether the (+) msg has already been processed // when it is going to be imploded with a (-) msg bool implodeProcessedFlag = false; #endif #ifdef JACKY_DEBUG //------------------------------------------------------------------------------------- ostream& jacky_os = JackyDebugStream::Instance().Stream(); if(toInsert->sign == POSITIVE){ TWMessage* temp = dynamic_cast(toInsert); Message* tempmsg = temp->getMessage(); jacky_os << "\tinputQ::insert() -> toInsert = {" << "id=" << toInsert->eventId << " sign=" << toInsert->sign << " " << toInsert->sender << "@" << toInsert->sendTime << " =" << tempmsg->type() << "=> " << toInsert->dest << "@" << toInsert->recvTime << flush; #ifdef JACKY_EVENT_JUMP //Nov. 22, 2005 if(temp->derivedFromExternalEvent() == true){ jacky_os << " [Event = T]" << flush; } #endif jacky_os << "}"<< endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete tempmsg; #endif } else { //Note: Negative BasicEvent cannot be cast to TWMessage!!! jacky_os << "\t(-)(-)(-)inputQ::insert() -> inserting Negative BasicEvent..." << endl << "\t\t toInsert = {"<< (*toInsert) << "}" << endl << flush; } #endif //-------------------------------------------------------------------------------------------------- // insert performs two functions: 1) insert (or implode) the element into both the main and appropriate // minilists. 2) figure out if the miniList should be rolledback. 3) figure out where currentPos now // belongs. 1) is simple. 2) is harder. 3) is very tricky. This code here performs 1). //1> Jacky: insert event into the Q, (+) or (-). For (-) msg, do implosion if (toInsert->sign == POSITIVE) { // if the sign is positive, we'll just insert it have fifo messaging, // there are no fast antimessages to implode with. MultiList::insert(toInsert, id); lastInserted = toInsert; #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] positiveMsgNumber++; //got a [+] message #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] } else { // inserting a negative event--will implode! #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] negativeMsgNumber++; //got a [-] message #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] #ifdef JACKY_DEBUG //---------------------------------------------------------------------------------- jacky_os << "\t(-)(-)(-)inputQ::insert() -> before implosion...." << endl << flush; jacky_os << "\t(-)(-)(-)inputQ::insert() -> call MultiList::find(==) with id = " << id << endl << flush; #endif //---------------------------------------------------------------------------------------------- // look for first element with same time objectPtr = MultiList::find(toInsert, EQUAL, id); #ifdef JACKY_DEBUG //-------------------------------------------------------------------------------------- if(objectPtr != NULL){ jacky_os << "\t(-)(-)(-)inputQ::insert() -> MultiList::find(==) gets the 1st element with same time = {" << (*objectPtr) << endl << flush; } else { jacky_os << "\t(-)(-)(-)inputQ::insert() -> MultiList::find(==) return NULL, no (+) msg with the same time found!!!" << endl << flush; } #endif //-------------------------------------------------------------------------------------------------- // walk through events with same time looking for implosion or duplicate while (msgCancel == false && objectPtr != NULL) { if ((objectPtr->eventId == toInsert->eventId) && (objectPtr->sender == toInsert->sender)) { // implosion if (objectPtr->sign != toInsert->sign) { #ifdef JACKY_REVISION // we have to remember whether we have executed the guy we just // imploded in order to decide whether to rollback or not. implodeProcessedFlag = objectPtr->alreadyProcessed; #endif //Jacky: this will delete the (+) msg and release memory! delete [] ((char *) MultiList::removeFind(id)); msgCancel = true; } else { // duplicate message--oops! cout << "Error: duplicate negevent received-aborting" << endl; abort(); } } #ifdef JACKY_DEBUG //------------------------------------------------------------------------------------------ if(msgCancel == true) { jacky_os << "\t(-)(-)(-)inputQ::insert() -> event implosion is done in the inputQ !!!!!!!!!!!!!!!" << "}" << endl << flush; //let's print out the current inputQ BasicEvent* cursor; /* jacky_os << "----(-)(-)(-)inputQ::insert()-- Current inputQ ----------------------------" << endl << flush; cursor = (BasicEvent*)head; while(cursor != NULL){ jacky_os << "<" << " id=" << cursor->eventId << " sign=" << cursor->sign << " " << cursor->sender << "@" << cursor->sendTime << " => " << cursor->dest << "@" << cursor->recvTime << " alreadyProcessed?" << cursor->alreadyProcessed << " >" << endl << flush; cursor = cursor->next; } jacky_os << endl << flush; jacky_os << "----(-)(-)(-)inputQ::insert()-- miniList pointers for localId = " << id << " --------------" << endl << flush; if(headObj[id] != NULL){ jacky_os << "\t headObj[" << id << "] = {" << *(headObj[id]) << "}" << endl << flush; } else { jacky_os << "\t headObj[" << id << "] = NULL" << endl << flush; } if(tailObj[id] != NULL){ jacky_os << "\t tailObj[" << id << "] = {" << *(tailObj[id]) << "}" << endl << flush; } else { jacky_os << "\t tailObj[" << id << "] = NULL" << endl << flush; } if(findObj[id] != NULL){ jacky_os << "\t findObj[" << id << "] = {" << *(findObj[id]) << "}" << endl << flush; } else { jacky_os << "\t findObj[" << id << "] = NULL" << endl << flush; } jacky_os << endl << flush; */ jacky_os << "----(-)(-)(-)inputQ::insert()-- miniList for localId = " << id << " -----------------" << endl << flush; if(headObj[id] != NULL){ cursor = (BasicEvent*)headObj[id]; while(cursor != NULL){ jacky_os << "<" << " id=" << cursor->eventId << " sign=" << cursor->sign << " " << cursor->sender << "@" << cursor->sendTime << " => " << cursor->dest << "@" << cursor->recvTime << " alreadyProcessed?" << cursor->alreadyProcessed << " >" << endl << flush; cursor = cursor->nextObj; } jacky_os << "--------------------- printing END ------------------------------------- " << endl << flush; } else { jacky_os << "----(-)(-)(-)inputQ::insert()-- There are 0 elements in the miniList for localId " << id << "! -----------" << endl << flush; } } #endif //----------------------------------------------------------------------------------------------------------- // Multilist moves currObj and insertObj if they point to the last // element and it's deleted. This screws up currObj[id], so this // line checks and fixes it if necessary. if (currentObj[id] != NULL && currentObj[id] == tailObj[id] && currentObj[id]->alreadyProcessed == true) { currentObj[id] = NULL; #ifdef JACKY_DEBUG //------------------------------------------------------------------------------------------------ jacky_os << "\tNOTE: (-)(-)(-)inputQ::insert() -> try to set currentObj[id] = NULL," << " we just remove the LAST one in the miniList!!!!!!" << endl << flush; #endif //----------------------------------------------------------------------------------------------------------- } objectPtr = MultiList< BasicEvent >::findNext(id); } // while //if (msgCancel == true) // Added line to see if it works - Ezequiel Glinsky, 2004 if (msgCancel == false) // Original line { #ifdef STATEDEBUG cout << " Inside the printing of the fast antimessage problem " << endl; *outFile << "LTSFInputQ: fast antimessage found!\n"; cout << toInsert << " " << *toInsert << "\n"; print(*outFile); abort(); #else cerr << "LTSFInputQ: fast antimessage found!" << endl; cerr << "toInsert: " << toInsert << " *toInsert: " << *toInsert << endl; print(); cerr << "LTSFInputQ printed - the process will abort" << endl; abort(); #endif } // else { //Jacky: continue ONLY when msgCancel = TRUE! // //cerr << "LTSFInputQ printed - the process will NOT abort" << endl; // } } // if toInsert->sign == POSITIVE // 2> Check if the miniQueue should be rolledback #ifndef JACKY_REVISION inThePast = miniListRollback(toInsert,id); #else //Jacky's version : adding a flag to determine secondary rollback for (-) msgs inThePast = miniListRollback(toInsert,id, implodeProcessedFlag); #endif #ifdef STATEDEBUG // if (inThePast == true) { // printSmallQ(*outFile, id); // *outFile << "The lvt of this object is " << lVTArray[id] << endl; // if (currentObj[id] != NULL){ // *outFile << *currentObj[id] << endl; // } // else { // *outFile << "The current Object now is null" << endl; // } // } #endif //------------------------------------------------------------------------------------------------------------- //if currentPos is NULL, the list is either empty, or everything left // in it has been processed. This means a positive or negative will // rollback one queue and the first unprocessed message in that queue // should be processed next. if (currentPos == NULL) { currentPos = currentObj[id]; } else { if (toInsert->sign == NEGATIVE) { // For a negative message in the past, if the next message to execute // in the miniqueue is before currentPos then we should reset currentPos. // NOTE: The paraenthesis around the third part of the expression // are necessary since && has higher precedence than ||, so do not remove them! if (inThePast == true && currentObj[id] != NULL && (currentObj[id]->recvTime < currentPos->recvTime || currentObj[id]->recvTime == currentPos->recvTime && // Here "<=" implies that our assumption is that // when we get events at the same time and for the // same destination, we insert before the first such // instance of this event at this time and dest. // Refer to MultiList::insert //!!!! 26/01/2001 currentObj[id]->dest < currentPos->dest) ) { currentPos = currentObj[id]; } } else { // Positive message, so we adjust the current pointer // to this message if it's recieve time is less than the current's, // or at the same time with a destination less than the current's // (thus in current's past). if (currentObj[id] != NULL && (currentObj[id]->recvTime < currentPos->recvTime || (currentObj[id]->recvTime == currentPos->recvTime && // Here "<=" implies that our assumption is that // when we get events at the same time and for the // same destination, we insert before the first such // instance of this event at this time and dest. // Refer to MultiList::insert // !!!! 26/01/2001 currentObj[id]->dest < currentPos->dest)) ) { currentPos = currentObj[id]; } } } //end if currentPos == NULL //-------------------------------------------------------------------------------------------------------------- #ifdef JACKY_DEBUG if(currentPos != NULL){ jacky_os << "\tinputQ::insert() -> set currentPos = {" << (*currentPos) << "}" << endl << flush; } else { jacky_os << "\tinputQ::insert() -> set currentPos = NULL" << endl << flush; } if(currentObj[id] != NULL){ jacky_os << "\tinputQ::insert() -> set currentObj[" << id << "] = {" << (*currentObj[id]) << "}" << endl << flush; } else { jacky_os << "\tinputQ::insert() -> currentObj[" << id << "] = NULL" << endl << flush; } #endif //end JACKY_DEBUG ------------------------------------------------------------------------------------- if(msgCancel == true){ // message implosion - delete event delete [] ((char *)toInsert); } #ifdef JACKY_DEBUG //------------------------------------------------------------------------------------------- jacky_os << "\tinputQ::insert() -> END, return inThePast = " << inThePast << endl << endl << flush; #endif //------------------------------------------------------------------------------------------------------ #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] if(inThePast == true){ //we need to do rollbacks rollbackNumber++; if(toInsert->sign == POSITIVE){ //if the rollback is caused by a [+] straggler message, increase "positiveStragglerMsgNumber" positiveStragglerMsgNumber++; } else { //if the rollback is caused by a [-] message, increase "negativeStragglerMsgNumber" negativeStragglerMsgNumber++; } } #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] return inThePast; } //Jacky Note: //This function is to find the event on the inputQ based on the "findTime" (rather than an actual event!) //used by TW::coastForward() BasicEvent* LTSFInputQueue::find(VTime& findTime, int objId, findMode_t mode, int localId){ static BasicEvent key; BasicEvent* findEvent; key.recvTime = findTime; key.dest = objId; findEvent = MultiList::find(&key, mode, localId); return(findEvent); } BasicEvent* LTSFInputQueue::getEventToSchedule() { BasicEvent* nextEvent = get(); while ((nextEvent != NULL) && (nextEvent->alreadyProcessed == true)) { nextEvent = seek(1,CURRENT); } return nextEvent; } BasicEvent* LTSFInputQueue::getAndSeek(int id, int localId) { // note: this is the object id, not the localId relevant to the MultiQueue BasicEvent *toExecute = get(); // toExecute is now either NULL or a valid + message if (toExecute != NULL && toExecute->dest == id) { // move to the next message, but not if the id changes toExecute->alreadyProcessed = true; // needed for VHDL if(currentObj[localId] != NULL){ currentObj[localId] = currentObj[localId]->nextObj; } lVTArray[localId] = toExecute->recvTime; seek(1, CURRENT); } else { toExecute = NULL; } return toExecute; } BasicEvent* LTSFInputQueue::getAndSeekObj(int id, int localId) { // note: this is the object id, not the localId relevant to the MultiQueue BasicEvent *toExecute = getObj(localId); // toExecute is now either NULL or a valid + message if (toExecute != NULL && toExecute->dest == id) { toExecute->alreadyProcessed = true; // needed for VHDL if(currentObj[localId] != NULL){ currentObj[localId] = currentObj[localId]->nextObj; } lVTArray[localId] = toExecute->recvTime; } else { toExecute = NULL; } return toExecute; } //[2006-03-20] //Jacky: this function is called by GVTManager::gcollect() when InfreqStateManager is used! int LTSFInputQueue::gcollect(VTime& gtime, int localId) { register BasicEvent *delptr, *tempPtr; int nodeleted = 0; delptr = headObj[localId]; if (delptr != NULL) { //Jacky: delete BasicEvent* with recvTime < gVT on the miniList for simuObj "localId" while (delptr != NULL && delptr->recvTime < gtime) { noOfEventsCommitted++; //Jacky: increase "number-of-events-committed" nodeleted++; //Jacky: increase "number-of-events-deleted" // InputQ sanity check to see that we have processed all messages that we are deleting. // This line also contains a workaround for Optimistic Fossil Collection which may need // to remove everything from the Queue regardless if it has been processed or not. if(delptr->alreadyProcessed == 0) { cout << "Attempt to delete unprocessed messages!" << endl; } tempPtr = delptr->nextObj; MultiList::remove(delptr, localId); delete [] ((char *)delptr); delptr = tempPtr; } return(noOfEventsCommitted); } else { //delptr = NULL (i.e. headObj[localId] = NULL) return 0; } } //[2006-03-20] //Jacky: this function is called by GVTManager::gcollect() when StateManager is used! int LTSFInputQueue::gcollect(VTime gtime, int localId, BasicEvent* ptrMarker) { register BasicEvent *delptr, *tempPtr; int nodeleted = 0; // The inputPointer passed in is the point up to garbage has to be collected // Jacky: this the the "inputPos" of the LAST state with lVT < gVT. That state will be left on the stateQ // in the place of the original initial state! // Starting from the head, garbage collect all objects up to this point. // If the pointer passed in is null do not garbage collect anything. delptr = headObj[localId]; if ( (ptrMarker != NULL) || (ptrMarker == NULL) ) { //Jacky: this condition is nothing!!! while (delptr != NULL && delptr != ptrMarker && delptr->recvTime < gtime){ noOfEventsCommitted++; nodeleted++; // InputQ sanity check to see that we have processed all messages that we are deleting. // This line also contains a workaround for Optimistic Fossil Collection which may need // to remove everything from the Queue regardless if it has been processed or not. if(delptr->alreadyProcessed == 0) { cout << "Attempt to delete unprocessed messages!" << endl; cout << *delptr << endl; print(); abort(); } tempPtr = delptr->nextObj; MultiList::remove(delptr, localId); delete [] ((char *)delptr); delptr = tempPtr; } return(noOfEventsCommitted); } else { return 0; } } ostream& operator << (ostream& os, const LTSFInputQueue& Iq) { Iq.print(os); return os; } void LTSFInputQueue::print(ostream& out) const { //out << "\n\nLTSFInputQueue print called\n"; cerr << "\n\nLTSFInputQueue print called\n"; //out << "total list length is " << listsize << "\n"; cerr << "total list length is " << listsize << "\n"; SortedListOfEvents::print(out); } void LTSFInputQueue::printMiniList(int i) { //cout << "\n\nLTSFInputQueue print called\n"; //cout << "total list length is " << listsize << "\n"; cout << "MiniList " << i << ": length = " << listSize[i] << "\n"; if (headObj[i] != NULL) { cout << "HeadObj = " << headObj[i] << " " << *headObj[i] << "\n"; } if (tailObj[i] != NULL) { cout << "TailObj = " << tailObj[i] << " " << *tailObj[i] << "\n"; } if (findObj[i] != NULL) { cout << "FindObj = " << findObj[i] << " " << *findObj[i] << "\n"; } if (insertObj[i] != NULL) { cout << "InsertObj = " << insertObj[i] << " " << *insertObj[i] << "\n"; } if (currentObj[i] != NULL) { cout << "CurrentObj = " << currentObj[i] << " " << *currentObj[i] << "\n"; } printSmallQ(cout,i); cout.flush(); } #ifdef JACKY_DEBUG // Nov. 17, 2005 ---------------------------------------------------------- //function to print the inputQ for the given processor void LTSFInputQueue::printMiniInputQ( ostream& os, int localId ){ BasicEvent* cursor; os << "--------------------- Current inputQ ------------------------" << endl << flush; os << "---------------------- MainList pointers --------------------" << endl << flush; if(head != NULL){ os << "\t head = {" << *(head) << "}" << endl << flush; } else { os << "\t head = NULL" << endl << flush; } if(tail != NULL){ os << "\t tail = {" << *(tail) << "}" << endl << flush; } else { os << "\t tail = NULL" << endl << flush; } if(findPos != NULL){ os << "\t findPos = {" << *(findPos) << "}" << endl << flush; } else { os << "\t findPos = NULL" << endl << flush; } if(insertPos != NULL){ os << "\t insertPos = {" << *(insertPos) << "}" << endl << flush; } else { os << "\t insertPos = NULL" << endl << flush; } if(currentPos != NULL){ os << "\t currentPos = {" << *(currentPos) << "}" << endl << flush; } else { os << "\t currentPos = NULL" << endl << flush; } os << endl << flush; os << "----- miniList pointers for localId = " << localId << " -----" << endl << flush; if(headObj[localId] != NULL){ os << "\t headObj[" << localId << "] = {" << *(headObj[localId]) << "}" << endl << flush; } else { os << "\t headObj[" << localId << "] = NULL" << endl << flush; } if(tailObj[localId] != NULL){ os << "\t tailObj[" << localId << "] = {" << *(tailObj[localId]) << "}" << endl << flush; } else { os << "\t tailObj[" << localId << "] = NULL" << endl << flush; } if(findObj[localId] != NULL){ os << "\t findObj[" << localId << "] = {" << *(findObj[localId]) << "}" << endl << flush; } else { os << "\t findObj[" << localId << "] = NULL" << endl << flush; } if(insertObj[localId] != NULL){ os << "\t insertObj[" << localId << "] = {" << *(insertObj[localId]) << "}" << endl << flush; } else { os << "\t insertObj[" << localId << "] = NULL" << endl << flush; } if(currentObj[localId] != NULL){ os << "\t currentObj[" << localId << "] = {" << *(currentObj[localId]) << "}" << endl << flush; } else { os << "\t currentObj[" << localId << "] = NULL" << endl << flush; } os << endl << flush; os << "---------- miniList for localId = " << localId << " --------------" << endl << flush; if(headObj[localId] != NULL){ cursor = (BasicEvent*)headObj[localId]; while(cursor != NULL){ TWMessage* temp = dynamic_cast(cursor); Message* tempmsg = temp->getMessage(); os << "<" << " id=" << cursor->eventId << " sign=" << cursor->sign << " " << cursor->sender << "@" << cursor->sendTime << " =" << tempmsg->type() << "=> " << cursor->dest << "@" << cursor->recvTime << " Processed?" << cursor->alreadyProcessed << flush; #ifdef JACKY_EVENT_JUMP //~~~~~~~~~~~~~~~~~ Nov. 22, 2005 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if(temp->derivedFromExternalEvent() == true){ os << " [Event!]" << flush; } #endif //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ os << " > [Address = " << cursor << "]" << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete tempmsg; #endif cursor = cursor->nextObj; }//end while os << "--------------------- printing END ------------------------------------- " << endl << flush; } else { os << "------ There are 0 elements in the miniList for localId " << localId << "! --------" << endl << flush; } } //function to print the whole inputQ void LTSFInputQueue::printFullInputQ( ostream& os ){ BasicEvent* cursor; os << "--------------------- Current inputQ ------------------------" << endl << flush; os << "---------------------- MainList pointers --------------------" << endl << flush; if(head != NULL){ os << "\t head = {" << *(head) << "}" << endl << flush; } else { os << "\t head = NULL" << endl << flush; } if(tail != NULL){ os << "\t tail = {" << *(tail) << "}" << endl << flush; } else { os << "\t tail = NULL" << endl << flush; } if(findPos != NULL){ os << "\t findPos = {" << *(findPos) << "}" << endl << flush; } else { os << "\t findPos = NULL" << endl << flush; } if(insertPos != NULL){ os << "\t insertPos = {" << *(insertPos) << "}" << endl << flush; } else { os << "\t insertPos = NULL" << endl << flush; } if(currentPos != NULL){ os << "\t currentPos = {" << *(currentPos) << "}" << endl << flush; } else { os << "\t currentPos = NULL" << endl << flush; } os << endl << flush; if( head != NULL ){ cursor = (BasicEvent*)head; while(cursor != NULL){ TWMessage* temp = dynamic_cast(cursor); Message* tempmsg = temp->getMessage(); os << "<" << " id=" << cursor->eventId << " sign=" << cursor->sign << " " << cursor->sender << "@" << cursor->sendTime << " =" << tempmsg->type() << "=> " << cursor->dest << "@" << cursor->recvTime << " Processed?" << cursor->alreadyProcessed << flush; #ifdef JACKY_EVENT_JUMP //~~~~~~~~~~~~~~~~~ Nov. 22, 2005 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if(temp->derivedFromExternalEvent() == true){ os << " [Event!]" << flush; } #endif //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ os << " > [Address = " << cursor << "]" << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete tempmsg; #endif cursor = cursor->next; } //end while }//end if head != NULL os << "--------------------- printing END --------------------------" << endl << flush; os << endl << flush; } #endif // end JACKY_DEBUG -------------------------------------------------------------------- #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] //function to unprocess the BasicEvent from which an (X) message is derived. //this is used for the NC to unprocess (X) messages in the NCBag that have recvTime > minTime void LTSFInputQueue::unprocessXEvent(BasicEvent* toUnprocess, int localId){ #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "\t\tBefore unprocess " << toUnprocess << ", NC inputQ is -------->" << endl << flush; printMiniInputQ( jacky_os, localId ); #endif //1. unprocess the event if( toUnprocess->alreadyProcessed == true ){ toUnprocess->alreadyProcessed = false; #ifdef JACKY_DEBUG jacky_os << endl << "2006-02-03 LTSFInputQueue::unprocessXEvent() -> " << toUnprocess << " unprocessed!" << endl << flush; #endif } //2. adjust currentObj[localId] & currentPos if necessary if ( (currentObj[localId] == NULL) || (currentObj[localId] != NULL && toUnprocess->recvTime < currentObj[localId]->recvTime) ) { currentObj[localId] = toUnprocess; #ifdef JACKY_DEBUG jacky_os << "2006-02-02 LTSFInputQueue::unprocessXEvent() -> reset currentObj[localId]!" << endl << flush; #endif } //end update currentObj[localId] /* if (currentPos == NULL) { currentPos = currentObj[localId]; } else if ( currentPos != NULL && ( (currentObj[localId]->recvTime < currentPos->recvTime) || (currentObj[localId]->recvTime == currentPos->recvTime && currentObj[localId]->dest < currentPos->dest) ) ) { //set currentPos to currentObj[ncLocalId] currentPos = currentObj[localId]; #ifdef JACKY_DEBUG jacky_os << "2006-02-02 LTSFInputQueue::unprocessXEvent() -> reset currentPos!" << endl << flush; #endif } //end update currentPos */ #ifdef JACKY_DEBUG jacky_os << "\t\tAfter unprocess " << toUnprocess << ", NC inputQ is -------->" << endl << flush; printMiniInputQ( jacky_os, localId ); #endif } //function to adjust the VTime in lVTArray[] for simuObj with localId //this is used for the NC to adjust the lVTArray[ncLocalId] to the minTime after resending //(X) messages in the NCBag void LTSFInputQueue::resetLVTArrayTime(VTime newTime, int localId){ #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "\t\tBefore reset, lVTArray[" << localId << "] = " << lVTArray[localId].asString() << endl << flush; #endif lVTArray[localId] = newTime; #ifdef JACKY_DEBUG jacky_os << "\t\tAfter reset, lVTArray[" << localId << "] = " << lVTArray[localId].asString() << endl << flush; #endif } #endif //end JACKY_NCBAG_POINTERS [2006-02-03] #endif // LTSFINPUTQUEUE_CC