/******************************************************************* * Declaration: This class is partially based on Glinsky's version that * can be found in the "oldsource" directory. * * DESCRIPTION: class ParallelNodeCoordinator * * AUTHOR: Qi (Jacky) Liu * * EMAIL: mailto://liuqi@sce.carleton.ca * * DATE: Sept, 2005 * *******************************************************************/ /** include files **/ #include "pncoord.h" // head file #include "pfcoord.h" // class ParallelFlatCoordinator #include "pNCoordState.h" // class ParallelNodeCoordinatorState #include "evaldeb.h" // ShowVirtualTimeWhenFinish #include "message.h" // class Message #include "pmessage.h" #include "pmodeladm.h" // class SingleParallelModelAdm #include "coupled.h" // class Coupled #include "parsimu.h" // class ParallelMainSimulator #include "log.h" // class Log #include //[2006-01-24] time(), ctime() /*private functions*/ /******************************************************************* * Function Name: private constructor * Description: the ParallelProcessorAdmin is a friend class of this * ParallelNodeCoordinator, this private constructor is * only used by the ParallelProcessorAdmin to generate a NC ********************************************************************/ ParallelNodeCoordinator::ParallelNodeCoordinator( NodeCoordinatorModel* ncmdl) : ParallelProcessor( ncmdl ) { //set up the connection to the only child, the local FC localFCId = ParallelMainSimulator::Instance().flatCoordinatorsList[ParallelMainSimulator::Instance().getMachineID()] ; #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-24] --------------------------------- //if we are using InfreqStateManager, we need to set "doStateSaving" flag in InfreqStateManager to TRUE for NC //this will, in fact, turn the InfreqStateManager into the StateManager for the NC! state->setDoStateSaving(true); //the NC will NOT do coast forward during rollbacks //set the statePeriod to -1 in the InfreqStateManager! state->setStatePeriod(-1); #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER [2006-03-24] -------------------------------------------- } /******************************************************************* * Function Name: isSendingToEnvironment() * Description: test whether the specified port will eventually connect * to an output port of the TOP model ********************************************************************/ bool ParallelNodeCoordinator::isSendingToEnvironment(const Port& destPort){ //we need to recersively trace the influenceList of the dest port to see whether an output port of the //TOP model is the destination bool result = false; const InfluenceList &influList( destPort.influences() ); //all ports that will receive the Y msg InfluenceList::const_iterator cursor( influList.begin() ); //Note: the receiving port may belong to an atomic model (we don't care this case) or a coupled model //if the receiving port belongs to a coupled model, the coupled model should be a descendant of the TOP model //we go up the hierarchical structure until the port has an empty influenceList, then if the port is one of the //output ports of the TOP model, return true; otherwise, return false for(; cursor != influList.end(); cursor++) { if( !((*cursor)->model()).isAtomic()) { //receiving port belongs to a coupled model if( ((*cursor)->model()).description() == "top" ){ //receiving port belongs to the TOP model result = true; break; //break the for loop } else { result = isSendingToEnvironment(*(*cursor)); if(result) { break; //break the for loop } } } } return result; } /******************************************************************* * Function Name: isSendingToLocalFC() * Description: test whether an external event will influence a local * simulator under the control of the local FC ********************************************************************/ bool ParallelNodeCoordinator::isSendingToLocalFC(const Port& eventPort){ //we need to recersively trace the influenceList of the event port //to see whether a port on a local simulator is the destination bool result = false; const InfluenceList &influList( eventPort.influences() ); //influList holds all ports on the TOP model that will receive the event InfluenceList::const_iterator cursor( influList.begin() ); for(; cursor != influList.end(); cursor++){ if(((*cursor)->model()).isAtomic()) { //if the receiving port is on an atomic model //test if the simulator is a local one if( ((*cursor)->model()).localProc() != ParallelProcessor::InvalidId){ result = true; break; //break the for loop } } else { //the receiving port is on a coupled model //we need to search recersively until an atomic receiving model is found result = isSendingToLocalFC(*(*cursor)); if(result) { break; //break the for loop } } } return result; } /******************************************************************* * Function Name: findRemoteNCs() * Description: find all remote NCs that will receive an (X) msg, the procIds * of these remote NCs are stored in "remoteNCList" * Note: After calling this function, if the resulting size of the "remoteNCList" * is zero, no (X) msg should be send to other NCs. This can be used to * test whether a (Y) msg from the FC will be forwarded to other NCs ********************************************************************/ ParallelNodeCoordinator& ParallelNodeCoordinator::findRemoteNCs(const Port& destPort){ InfluenceList influList = destPort.influences(); //the original influenceList for the specified port InfluenceList::const_iterator cursor = influList.begin(); for(; cursor != influList.end(); cursor++){ //iterate over all ports that will receive msg from the specified port if(((*cursor)->model()).isAtomic()) { //if the receiving port is on an atomic model //get the procId of the corresponding simulator ProcId localProcId = ((*cursor)->model()).localProc(); //check whether this simulator is on a remote machine if(localProcId == ParallelProcessor::InvalidId){ // it is on a remote machine //get the number of the machine on which it is running from model's masterId ProcId masterProcId = ((*cursor)->model()).masterId(); MachineId mid = ((*cursor)->model()).machineForProcId(masterProcId); //find out the procId of the remote NC ProcId remoteNCprocId = ParallelMainSimulator::Instance().nodeCoordinatorsList[mid]; //add the procId of the remote NC into the "remoteNCList" getRemoteNCList().insert(remoteNCprocId); //Note: Since the "remoteNCList" is a set, no duplications of a remote NC's procId //exist in it } } else { //the receiving port is on a coupled model //we need to search recersively until an atomic receiving model is found findRemoteNCs(*(*cursor)); } } return *this; } /******************************************************************* * Function Name: hasUnprocessedEventWithSmallestTimeBeforeStop() * Description: function to check whether there are unprocessed events in the inputQ * that have the same timestamp as those (X) messages already in the * NCMessageBag and before the stop time. Also, the receiver of these events * should be the NC itself. * return TRUE if there are unprocessed events on the inputQ that have * (recvTime <= stopTime) && (recvTime == bag.time()) * return FALSE if otherwise. ********************************************************************/ bool ParallelNodeCoordinator::hasUnprocessedEventWithSmallestTimeBeforeStop(){ bool result = false; //just go through the inputQ to find the 1st unprocessed event's recvTime BasicEvent* eventCursor = (BasicEvent*)(BasicTimeWarp::inputQ.getHead()); while(eventCursor != NULL){ if( (eventCursor->alreadyProcessed == false) //unprocessed && ((eventCursor->recvTime < stopTime()) || (eventCursor->recvTime == stopTime())) //before STOP && (getNCBag().time() == eventCursor->recvTime) //has same time as (X) msgs already in the bag //Nov. 21, 2005 //the following condition is added because there may be some events on the inputQ that have //different receivers, like the FC. Actually, the NC should not be DORMANT in this case, but //in the case of "event-jump", an (X) msg derived from an external event may be left on the inputQ //whose receiver is the FC. && (eventCursor->dest == id()) //Nov. 21, 2005. Is the receiver of the events is me? ){ return true; } eventCursor = eventCursor->next; } //end while return result; } #ifdef JACKY_RB_EXCEPTION //================================================================================== /******************************************************************* * Function Name: peekNextEventToBeExecuted() * Description: function to peek ahead the next event to be executed in the current inputQ * return the event to be executed next, or NULL if no to-be-executed event left * on the current inputQ * Nov. 3, 2005 ********************************************************************/ BasicEvent* ParallelNodeCoordinator::peekNextEventToBeExecuted(){ /* #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); TWMessage* nextToExecuteMsg; #endif */ //get the event pointed by "currentPos" of the inputQ BasicEvent* nextToExecuteEvent = BasicTimeWarp::inputQ.get(); //move to the event to be executed next, or NULL if there is no such event on the current inputQ while( (nextToExecuteEvent != NULL) && (nextToExecuteEvent->alreadyProcessed == true) ){ nextToExecuteEvent = nextToExecuteEvent->next; } /* #ifdef JACKY_DEBUG if( nextToExecuteEvent != NULL ){ //next to-be-executed event found nextToExecuteMsg = (TWMessage*)nextToExecuteEvent; jacky_os << "EXCEPTION CHECK!!! nextEvent -> id=" << nextToExecuteEvent->eventId << " sign=" << nextToExecuteEvent->sign << " " << nextToExecuteEvent->sender << "@" << nextToExecuteEvent->sendTime << " =" << nextToExecuteMsg->getMessage()->type() << "=> " << nextToExecuteEvent->dest << "@" << nextToExecuteEvent->recvTime << " Processed?" << nextToExecuteEvent->alreadyProcessed << endl << flush; } else { jacky_os << "EXCEPTION CHECK!!! nextEvent -> NULL!!!" << endl << flush; } #endif //end JACKY_DEBUG */ return nextToExecuteEvent; } /******************************************************************* * Function Name: hasUnprocessedCollectMessageAt(const VTime&) * Description: function to check whether there is already a @ message with the * given recvTime that has not yet been processed on the current inputQ. * This fucntion is used by receive(DoneMessage) to handle exceptional * situations during rollbacks * return TRUE if such message exists, or FALSE if otherwise * Nov. 3, 2005 ********************************************************************/ bool ParallelNodeCoordinator::hasUnprocessedCollectMessageAt(const VTime& thisTime){ bool result = false; //just go through the inputQ BasicEvent* eventCursor = (BasicEvent*)(BasicTimeWarp::inputQ.getHead()); TWMessage* nextToExecuteMsg; Message* nextMsg; while(eventCursor != NULL){ if( eventCursor->alreadyProcessed == false //unprocessed && eventCursor->recvTime == thisTime //has recvTime equal to the given time ){ nextToExecuteMsg = (TWMessage*)eventCursor; nextMsg = nextToExecuteMsg->getMessage(); //this will create a NEW message!!! if( nextMsg->type() == "@" ) { //it's a @ msg #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "[EXCEPTION CHECK!!!] ====> same @ message found!!!" << endl << flush; #endif result = true; } #ifdef JACKY_DESTROY_MESSAGE delete nextMsg; #endif } eventCursor = eventCursor->next; } //end while return result; } #endif // end JACKY_RB_EXCEPTION ============================================================================= /******************************************************************* * Function Name: sortMessagesInBag() * Description: function to send out (X) messages in the bag followed by a (*) message to the FC. * This function will set lastChange to the time of the bag, and nextChange to 0. * It will also set the nextMessageType to @. After this function is invoked, the * NCBag is properly cleaned *********************************************************************/ void ParallelNodeCoordinator::sortMessagesInBag(){ #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); #endif //Set the time for next change to now. lastChange( getNCBag().time() ); nextChange( VTime::Zero ); //we need to send the (X) in bag that have MIN time to the FC MessageBag::MessageList xMsgList = getNCBag().getMessageWithMinTime(); MessageBag::MessageList::const_iterator xMsgCursor = xMsgList.begin(); for( ; xMsgCursor != xMsgList.end(); xMsgCursor++ ) { const BasicMsgValue* pmsgVal = (*xMsgCursor)->value(); ExternalMessage extMsg( (*xMsgCursor)->time(), id(), (*xMsgCursor)->port(), ((RealMsgValue*)(pmsgVal))->v ); const BasicExternalMessage* xmsgPtr = dynamic_cast(*xMsgCursor); extMsg.senderModelId( xmsgPtr->senderModelId() ); #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] CHECK!!! NC " << this->id() << " sends (X) msg in NCBag / time = " << ((*xMsgCursor)->time()).asString() << " / senderId = " << id() << " / port = " << ((*xMsgCursor)->port()).name() << " / value = " << ((RealMsgValue*)(pmsgVal))->asString() << " / senderModelId = " << xmsgPtr->senderModelId() << endl << flush; #endif send( extMsg, localFCId ); } //end for #ifdef JACKY_NCBAG_POINTERS //[2006-02-03]----------------------------------------------------------------- //Whenever we remove (X) messages from the NCBag, we should also remove elements in the NCBagReference! //remove elements in the NCBagReference with key = NCBag.time() getBagRef().erase( getNCBag().time() ); #endif //end JACKY_NCBAG_POINTERS [2006-02-03]------------------------------------------------------------- getNCBag().removeMessageWithMinTime(); //clear the (X) msgs in the bag we have sent //we need to send out the (*) msg to trigger externalFunction in the simulator //Set the next message type to (@) sendMsgType( ParallelNodeCoordinatorState::CollectMsg ); #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] CHECK!!! NC " << this->id() << " set lastChange = " << lastChange() << " / nextChange() = " << nextChange() << " -> sends out (*) msg with absoluteNext() = " << absoluteNext().asString() << endl << flush; #endif InternalMessage intMsg( absoluteNext(), id()); send ( intMsg, localFCId ); } /*protected functions*/ /******************************************************************* * Function Name: calculateMinTime() * Description: calculate the MIN time among * 1> (X) msgs in the NCMessageBag * 2> external events in the EventList, and * 3> the absoluteNext() of this NC * this function is used in the receive(DoneMessage&) ********************************************************************/ VTime ParallelNodeCoordinator::calculateMinTime() { VTime minTime(VTime::Inf); //1> is there any external events? if( !endOfEvents() && (minTime > (currentEvent()->time)) ) { //there are events and their time is less than the minTime minTime = currentEvent()->time; } //now, the minTime is the minimum time among external events //2> is there any (X) msg from other NCs? if( (getNCBag().size() > 0) && (minTime > getNCBag().time()) ){ //there are (X) msgs from other NCs and their time is less than the minTime minTime = getNCBag().time(); } //now, the minTime is the minimum time among external events & (X) msgs from other NCs //3> shall we trigger internal transition first? if(minTime > absoluteNext()) { minTime = absoluteNext(); } //now, minTime is the MIN time among the three return minTime; } /******************************************************************* * Function Name: rollbackCheck() * Description: Checks if a rollback has taken place and cleans local variables. * This is a virtual function defined in ParallelProcessor, we override it * here to clean our specific local variables * Note: this function is called in ParallelProcessor::executeProcess() ********************************************************************/ /* bool ParallelNodeCoordinator::rollbackCheck(const VTime& currentTime){ //1> call base class' version to restore the "lastMsgTime" bool rollback = ParallelProcessor::rollbackCheck( currentTime ); //2> if rollback happened, clean other local variables //Note: a> the EventList should NOT be cleaned. The eventsCursor should NOT be restored since previous // external events that have been sent out by the ParallelNodeCoordinator are saved in the inputQ, // and when there is a rollback, these mags in the inputQ will be unprocessed and reprocessed by the // TimeWarp kernel. // b> stop time is unchanged // c> localFCId is unchanged // d> TimeWarp should be able to restore the "nextMsgType" and "dormant", which are part of the // ParallelNodeCoordinatorState if ( rollback ) { getRemoteNCList().clear(); //clear the remoteNCList getNCBag().eraseAll(); //clear the NCMessageBag } return rollback; } */ /******************************************************************* * Function Name: sortOutputMessage(BasicOutputMessage &) * Description: this function processes the Y msg received from the local FC. * It finds out whether the Y msg should be output to the environment. * if so, the Y msg is forwarded to the Root; it also finds out whether * the Y msg is sending to remote simulators. if so, The NC checks what * remote NCs should receive the msg.It translates the received Y msg to * X msgs and relays them to the remote NCs *******************************************************************/ ParallelNodeCoordinator &ParallelNodeCoordinator::sortOutputMessage( const BasicOutputMessage &msg ) { bool sendout = false; //test whether this NC gets a correct (Y) msg //Note: the Y msg should always coming from the child FC MASSERTMSG( msg.procId() == localFCId, "NC::sortOutputMessage() -> NC gets wrong Y msg!" ); #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os < need send to the environment? if ( isSendingToEnvironment(msg.port()) ) { #ifdef JACKY_DEBUG jacky_os << "[" << ParallelMainSimulator::Instance().getMachineID() << "] NC.sortOutputMessage() at time " << msg.time() << " sending (Y) msg to Root" << endl << flush; #endif BasicOutputMessage outMsg( msg ); outMsg.procId( id() ); sendout = true; //send the (Y) msg to the Root send(outMsg, ParallelProcessorAdmin::rootId); } //2> need send to remote NCs? //now find out those remote receiving NCs MASSERTMSG( getRemoteNCList().size() == 0, "NC::sortOutputMessage() -> Garbage in the remoteNCList!" ); findRemoteNCs(msg.port()); #ifdef JACKY_DEBUG jacky_os << "[" << ParallelMainSimulator::Instance().getMachineID() << "] NC.sortOutputMessage() at time " << msg.time() << " : NC " << id() << " need to send (X) msg to " << getRemoteNCList().size() << " remote NCs..." << endl << flush; #endif //now the "remoteNCList" is loaded with procIds of remote NCs ParallelNodeCoordinatorState::ProcSet::const_iterator cursor( getRemoteNCList().begin() ) ; BasicExternalMessage extMsg; //X msg to be sent to remote NCs extMsg.time( msg.time() ); extMsg.procId( id() ); extMsg.value( msg.value()->clone() ); extMsg.port( msg.port() ) ; extMsg.senderModelId( msg.port().modelId() ); #ifdef JACKY_DEBUG jacky_os << "\t\t[" << ParallelMainSimulator::Instance().getMachineID() << "] NC.sortOutputMessage() at time " << msg.time() << " : NC " << id() << " create (X) msg with senderModelId = " << msg.port().modelId() << endl << flush; #endif if( cursor != getRemoteNCList().end() ){// the remoteNCList is not empty sendout = true; } //send out the (X) msgs to these remote NCs for( ; cursor != getRemoteNCList().end(); cursor++ ) { #ifdef JACKY_DEBUG jacky_os << "[" << ParallelMainSimulator::Instance().getMachineID() << "] NC.sortOutputMessage() at time " << msg.time() << " sending (X) msg to remote NCs " << (*cursor) << endl << flush; #endif send( extMsg, (*cursor) ) ; }//end for //don't forget to clear the remoteNCList getRemoteNCList().clear(); MASSERTMSG( sendout == true, "NC::sortOutputMessage() -> (Y) msg is sent to nowhere!" ); MASSERTMSG( getRemoteNCList().size() == 0, "NC::sortOutputMessage() -> remoteNCList not cleared!" ); return *this; } /*public functions*/ /******************************************************************* * Function Name: initialize() * Description: required by TimeWarp. This function initializes the * lastChange() & nextChange() of this NC, the stopTime, * the ParallelNodeCoordinatorState::NextMsgType, clear the * externalEvents * Note: when this function is called, the external events are not * yet loaded into the externalEvents! ********************************************************************/ void ParallelNodeCoordinator::initialize() { ParallelProcessor::initialize(); //set up the FileQueue for generating log file lastChange( VTime::Zero ); nextChange( VTime::Inf ); stopTime( ParallelMainSimulator::Instance().loader()->stopTime() ); sendMsgType( ParallelNodeCoordinatorState::CollectMsg ); isDormant(false); //dormant is initialized to FALSE getRemoteNCList().clear(); externalEvents.erase( externalEvents.begin(), externalEvents.end() ) ; eventsBegin(); //initialize the "eventsCursor" to NULL getNCBag().eraseAll(); //initialize (clear) the NCMessageBag #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] getBagRef().clear(); //clear the NCBagReference #endif //end JACKY_NCBAG_POINTERS [2006-02-03] //we should not sort the externalEvents here since the events have not yet been loaded into the EventList //externalEvents.sort(); ParallelMainSimulator::Instance().debugStream() << "Initializing NodeCoordinator: OK" << endl << flush; } /******************************************************************* * Function Name: addExternalEvent() * Description: load one external event into the NC ONLY if the event will influence a LOCAL simulator later! * For a single machine, this test has no side-effect since all simulators are on the same machine. * Thus, all external events will eventually influence at least one local simulator. ********************************************************************/ ParallelNodeCoordinator &ParallelNodeCoordinator::addExternalEvent( const VTime &t, const Port &p, const Real &r ) { if( isSendingToLocalFC(p) ){ //load event only if it will go to a local simulator! externalEvents.push_back( Event(t, &p, r) ); } return *this; } /*Four receive functions*/ /******************************************************************* * Function Name: receive(InitMessage &) * Description: forwards the (I) msg to the local FC ********************************************************************/ ParallelProcessor &ParallelNodeCoordinator::receive( const InitMessage &msg ) { InitMessage init ( msg.time(), id() ); //forward the I msg to the Local FC this->send(init, localFCId); #ifdef JACKY_MSG_TYPE_BASED_STATE_SAVING //[2006-02-23] skipStateSaving = true; //this causes the TimeWarp to skip state saving after processing this event #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "##### NC::skipStateSaving = TRUE #####" << endl << flush; #endif //end JACKY_DEBUG #endif //end JACKY_MSG_TYPE_BASED_STATE_SAVING [2006-02-23] return *this; } /******************************************************************* * Function Name: receive(DoneMessage &) * Description: advance the time and go on with the simulation ********************************************************************/ #ifndef JACKY_RB_EVENT //---------------- ORIGINAL VERSION ---------------------------------- /* ParallelProcessor &ParallelNodeCoordinator::receive( const DoneMessage &msg ){ //(D) msg must be from localFCId (i.e., !isFromSlave()) MASSERTMSG( ( !msg.isFromSlave() ), "NC: (D) msg received from 'slave' instead of 'localFCId'!" ); #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] (D) msg received in NC " << this->id() << " at time = " << msg.time().asString() << " / sender = " << msg.procId() << " / nextTime = " << msg.nextChange().asString() << endl << flush; //check the current lVT //Nov. 16, 2005 OOP! VTime initialLVT = getLVT(); jacky_os << "OOP! NC lVT = " << initialLVT.asString() << endl << flush; #endif //We have to find out if the (D) msg is a response to a (@) msg or to a (*) msg //1> update the NC's last & next time lastChange( msg.time() ); nextChange( msg.nextChange() ); //2> If we have to send a (*) Message, send it now if ( sendMsgType() == ParallelNodeCoordinatorState::InternalMsg ) { //It should be the case that absoluteNext() == msg.time(); MASSERTMSG( absoluteNext() == msg.time(), "NC" + int2Str(id()) + " trying to send (*) msg to the FC when absoluteNext() != msg.time()!" ); InternalMessage intMsg( absoluteNext(), id()); send ( intMsg, localFCId ); //send (*) to the local FC //Set the next message type to Collect sendMsgType( ParallelNodeCoordinatorState::CollectMsg ); #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " has sent a (*) msg " << " at " << msg.time().asString() << " to the local FC" << localFCId << endl << flush; #endif } else { // sendMsgType() == ParallelNodeCoordinatorState::CollectMsg //3> sendMsgType() == @, let's see if there are any external events or (X) msgs from other NCs // need to be sent before the (@) msg //a> first find the MIN time among: // 1> the external events in the EventList, // 2> the (X) msgs from other NCs in the NCMessageBag, // 3> the absoluteNext() of the received (D) msg VTime minTime = calculateMinTime(); #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " MINTime = " << minTime.asString() << endl << flush; #endif //Should we go on? //Note: when minTime > stopTime(), we should stop sending out msgs! if( minTime > stopTime() ){ //this NC is now in the dormancy state! no message should be send out now. //The "dormant" flag in the state should be set to TRUE now //But it still has to wait for other NCs to finish. //It may receive (X) msgs from other NCs. In this case, the NC is activated. //The (X) msgs will be added to the NCBag if no rollback is needed, and the //receive(BasicExternalMessage *) will check the "doramnt" flag, if true, //that function will send the (X) msgs out to the FC; //A rollback will happen on this NC if a straggler (X) msg is received isDormant(true); //set the flag //Sept. 28, 2005 //The (D) msg has been received in this NC. There are no external events //that need to be sent out (they may have already been sent out before now, and //still stay on the inputQ unprocessed! ) and there are no (X) msg in the NCBag //from other NCs that need to be sent (Again, these msgs may stay on the inputQ //unprocessed! ) //The NC will not send out further messages. //For case 1: the unprocessed external events on the inputQ will be scheduled by the // event scheduler automatically //For case 2: the receive(Xmsg) function will add them into the NCBag, and check // whether ALL events with the smallest time stamp on the inputQ have already // been processed. If all events with the smallest time stamp on the inputQ // have been processed, that function will send out the (X) msgs // in the NCBag, and the simulation carries on) } else { //b> We have several possiblities here: // 1> minTime = currentEventTime OR bagTime < absoluteNext() // -> in this case, we have some external events OR (X) msgs from other NCs need to be sent out // and it's not yet the time to trigger internalTransition // 2> minTime = absoluteNext() < currentEventTime AND bagTime // -> in this case, we need to trigger the model's output function first by sending a (@) msg // 3> minTime = currentEventTime OR bagTime = absoluteNext() // -> in this case, there are some external events (either from the environment or // from other NCs) that occur at the same time as the next InternalTransition. // We need to send the extenal events as well as a (@) msg // #ifdef JACKY_DEBUG //print out the current EventList jacky_os << "before: CHECK THE CURRENT EVENT LIST --> " << endl << flush; showEvents(events(), jacky_os); #endif //send out all the external events with this MIN time while ( !endOfEvents() && (minTime == (currentEvent()->time)) ) { Event ev = *(currentEvent()); //the current event //Note: we should only send out external events that will influence a local simulator! // If an external event has nothing to do with a local simulator, we will ignore // it (it will be handled by other NCs) if(isSendingToLocalFC(*ev.port)){ //this event will be received by a local simulator ExternalMessage extMsg( ev.time, id(), *ev.port, ev.value.value() ); extMsg.senderModelId( model().id() ); #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " : sends EXTERNAL EVENT as (X) msg [time = " << extMsg.time() << " / senderId = " << id() << " / destPort = " << extMsg.port().name() << "@" << extMsg.port().model().asString() << " / value = " << extMsg.value() << "] to " << localFCId < " << endl << flush; showEvents(events(), jacky_os); //check the lVT again after sending all external events //Nov. 16, 2005 OOP! jacky_os << "OOP! after sending events: NC lVT = " << getLVT().asString() << endl << flush; #endif //send out all (X) msgs in the NCMessageBag with this MIN time if( (getNCBag().size() > 0) && (minTime == getNCBag().time()) ){ //get these (X) msgs MessageBag::MessageList xMsgList = getNCBag().getMessageWithMinTime(); //int originalSize; #ifdef JACKY_DEBUG int originalSize = xMsgList.size(); jacky_os << "$$$$[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " NCBag has {" << originalSize << "} messages with min time (" << minTime.asString() << ")" << endl << flush; jacky_os << "$$$$ current NCBag is:" << endl << flush; jacky_os << getNCBag() << endl << flush; #endif MessageBag::MessageList::const_iterator xMsgCursor = xMsgList.begin(); for( ; ( xMsgCursor != xMsgList.end() ) && ( minTime == getNCBag().time() ); xMsgCursor++ ) { //for each (X) msg in the NCMsgBag with the same MIN time, sends it to the FC //Note: // if send(extMsg, localFCId) triggers a rollback on the FC and the resulting // secondary rollbacks happen on this NC, the NCMessageBag may be cleaned by // function rollbackProcessorVariables(), and thus the pointers in the xMsgList // are deleted (invalidated)! Therefore, we have to check this here!!! // Oct. 26, 2005 // const BasicMsgValue* pmsgVal = (*xMsgCursor)->value(); ExternalMessage extMsg( (*xMsgCursor)->time(), id(), (*xMsgCursor)->port(), ((RealMsgValue*)(pmsgVal))->v ); const BasicExternalMessage* xmsgPtr = dynamic_cast(*xMsgCursor); extMsg.senderModelId( xmsgPtr->senderModelId() ); #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " sends out (X) msg in NCBag / time = " << ((*xMsgCursor)->time()).asString() << " / senderId = " << id() << " / port = " << ((*xMsgCursor)->port()).name() << " / value = " << ((RealMsgValue*)(pmsgVal))->asString() << " / senderModelId = " << xmsgPtr->senderModelId() << endl << flush; #endif //Note: IMPORTANT!!! This send() is going to forward (X) from other NCs to the // local FC, and this may result in a rollback on the local FC!!! // After the possible rollback, the state of this NC may be restored to // a previously stored state! Therefore, we cannot take (sendMsgType() == // ParallelNodeCoordinatorState::CollectMsg) as granted!!! //Oct. 19, 2005 Jacky send( extMsg, localFCId ); } //end for #ifdef JACKY_DEBUG //check the lVT again after sending all (X) msgs in the NCBag //Nov. 16, 2005 OOP! jacky_os << "OOP! after sending (X) in NCBag: NC lVT = " << getLVT().asString() << endl << flush; #endif if( minTime == getNCBag().time() // && (getNCBag().getMessageWithMinTime().size() == originalSize) //Nov. 5, 2005 ){ //set the flag //rollbackTriggeredByStraggler = true; #ifdef JACKY_DEBUG jacky_os << "BEFORE Removing minTime msgs in NCBag, NCBag --->" << endl << flush; jacky_os << getNCBag() << endl << flush; #endif //clear the (X) msgs in the bag we have sent getNCBag().removeMessageWithMinTime(); #ifdef JACKY_DEBUG jacky_os << "AFTER RemovING minTime msgs in NCBag, NCBag --->" << endl << flush; jacky_os << getNCBag() << endl << flush; #endif } //else { // //} } //end if //If we have also an internal transition, then send the Collect message if (minTime == absoluteNext() // For the reason given in the above Note, we need to check the msgType again here! // Don't remove the following condition! It is NOT redundant! // Oct. 19, 2005 && (sendMsgType() == ParallelNodeCoordinatorState::CollectMsg) //check again!!! ) { //MinTime = absoluteNext(last + next) & nextMsgTyep = @ (in the current state) CollectMessage collectMsg = CollectMessage( minTime, id() ); //Set the next message type to (*) sendMsgType( ParallelNodeCoordinatorState::InternalMsg ); send ( collectMsg, localFCId ); #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " sends out (@) msg with time = " << absoluteNext().asString() << endl << flush; //check RB //Nov. 16, 2005 VTime currentLVT = getLVT(); jacky_os << "OOP! after sending @ msg, NC currentLVT = " << currentLVT.asString() << flush; if( !(currentLVT == initialLVT) ) { //LVT changed, RB happened! jacky_os << " // RB happened: " << initialLVT.asString() << " => " << currentLVT.asString() << endl << flush; } else { jacky_os << endl << flush; } #endif } else if ( //Since the NC may be rolled back to a previous state, we cannot assume that the //sendMsgType() is still @! Thus, we need to check again here! // Don't remove the following condition! It is NOT redundant! // Oct. 19, 2005 sendMsgType() == ParallelNodeCoordinatorState::CollectMsg //check again!!! ) { //2> MinTime != absoluteNext(last + next) & nextMsgTyep = @ (in the current state) //Set the time for next change to now. //lastChange( minTime ); //nextChange( VTime::Zero ); //Set the next message type to (@) sendMsgType( ParallelNodeCoordinatorState::CollectMsg ); InternalMessage intMsg( minTime, id()); send ( intMsg, localFCId ); #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " sends out (*) msg with time = " << minTime.asString() << endl << flush; //check RB //Nov. 16, 2005 VTime currentLVT = getLVT(); jacky_os << "OOP! after sending * msg, NC currentLVT = " << currentLVT.asString() << flush; if( !(currentLVT == initialLVT) ) { //LVT changed, RB happened! jacky_os << " // RB happened: " << initialLVT.asString() << " => " << currentLVT.asString() << endl << flush; } else { jacky_os << endl << flush; } #endif } else { //rollbacks triggered by straggler (X) messages must have happened! //MASSERTMSG( rollbackTriggeredByStraggler == true, // "NC::receive(DoneMessage) -> Detect straggler rollbacks failed!" ); //reset the flag //rollbackTriggeredByStraggler = false; //3> (MinTime = absoluteNext(last + next) & nextMsgTyep != @ ) //OR (MinTime != absoluteNext(last + next) & nextMsgTyep != @) //In this case, the nextMsgType has been changed! A rollback must have happended //when sending (X) in the NCBag to the FC! //That is, a [+] straggler message sending fron this NC to the FC has triggered //rollbacks throughout all processors on this machine. The state of this NC has been //restored to a previous state which has a different nextMsgType (i.e. nextMsgType in //the current state has been restored to *!) // Oct. 20, 2005 //No more message should be sent out from this NC in this case. #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " ROLLBACK HAPPENED! NO @ or * MSG WILL BE SENT TO THE FC!" << endl << flush; //check RB //Nov. 16, 2005 VTime currentLVT = getLVT(); jacky_os << "OOP! RB must have happended! NC currentLVT = " << currentLVT.asString() << flush; if( !(currentLVT == initialLVT) ) { //LVT changed, RB happened! jacky_os << " // RB happened: " << initialLVT.asString() << " => " << currentLVT.asString() << endl << flush; } else { jacky_os << endl << flush; } #endif // Jacky Note: Nov. 4, 2005 // If sending a (X) message to the FC from the NCBag caused rollbacks to happen, we // need to do the following things in order to maintain the inputQ, stateQ, and outputQ // of the FC, and NC: // 1. The extra state corresponding to this receive(DoneMessage) should not be saved. // This is done by setting the skipStateSaving flag to true. // 2. The straggler positive (X) message should be removed from the inputQ of the FC. // This straggler message can be found on the outputQ of this NC. As mentioned in // the following point, it is the LAST message on this NC's outputQ. // 3. The same message should be removed from the outputQ of this NC. This message // should be the LAST message on the current outputQ. // #ifdef JACKY_DEBUG //let's print out the current outputQ of this NC outputQ.printOutputQ( jacky_os ); #endif cerr << "!!!Straggler (NC" << this->id() << ")!!!" << endl; skipStateSaving = true; //now, we need to remove this straggler event from the outputQ of this NC BasicEvent* stragglerEvent = dynamic_cast( outputQ.remove( outputQ.getTail() ) ); #ifdef JACKY_DEBUG if( stragglerEvent != NULL ){ //current straggler event found TWMessage* stragglerMsg = (TWMessage*)stragglerEvent; jacky_os << "(((((( EXCEPTION CHECK!!! stragglerEvent -> id=" << stragglerEvent->eventId << " sign=" << stragglerEvent->sign << " " << stragglerEvent->sender << "@" << stragglerEvent->sendTime << " =" << stragglerMsg->getMessage()->type() << "=> " << stragglerEvent->dest << "@" << stragglerEvent->recvTime << " Processed?" << stragglerEvent->alreadyProcessed << endl << flush; } else { jacky_os << "(((((( EXCEPTION CHECK!!! stragglerEvent -> NULL!!! THIS IS STRANGE!!!! WHY???" << endl << flush; } //print outputQ after removal of the straggler event jacky_os << endl << "After removal of straggler, outputQ is: " << endl << flush; outputQ.printOutputQ( jacky_os ); //cerr << "!!!!!!!!!!!!! Attention: Straggler removed from NC outputQ !!!!!" << endl; #endif //also remove the strggler from the inputQ of the receiving FC //This is much like an message implosion //1> get the FC's localId int FC_localId = SingleParallelProcessorAdmin::Instance().processor(localFCId).localId; #ifdef JACKY_DEBUG jacky_os << endl << "((((( FC localId = " << FC_localId << " )))))" << endl << flush; #endif //2> call removeStragglerEvent() on the inputQ BasicTimeWarp::inputQ.removeStragglerEvent(stragglerEvent, FC_localId); } } //end minTime > stopTime() } //end sendMsgType() == @ return *this; } */ #else //------------------ NEW VERSION (JACKY_RB_EVENT) ------------------------------------ ParallelProcessor &ParallelNodeCoordinator::receive( const DoneMessage &msg ){ //(D) msg must be from localFCId (i.e., !isFromSlave()) MASSERTMSG( ( !msg.isFromSlave() ), "NC: (D) msg received from 'slave' instead of 'localFCId'!" ); VTime initialLVT = getLVT(); //this should be the recvTime of the received (D) message #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "-=-=-=[====OOP====] (D) msg received in NC " << this->id() << " initialLVT = " << initialLVT.asString() << endl << flush; #endif //We have to find out if the (D) msg is a response to a (@) msg or to a (*) msg //1> update the NC's last & next time lastChange( msg.time() ); nextChange( msg.nextChange() ); #ifdef JACKY_SYNC_TIME0 //[2006-01-23] ================================================================================= //synchronize all LPs at time 0 after getting a (D) message corresponding to the (@) message //At this time, all atomic model simulator's outputFunction have been invoked, and all (Y) //messages have been sent to other LPs #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- //[2006-03-15] //Note: if this is in coast forward operations, we should NOT do MPI_Barrier!! //getSuppressMessage() can return one of the following int values: // NONE -> Cancel msg Aggressively [0] // LAZYCANCEL -> Lazy Cancellation Suppression [1] // COASTFORWARD -> CoastForward Suppression [2] // LAZYAGGRCANCEL -> Dynamic cancellation [3] if( getSuppressMessage() != 2 ){ #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] -------------------------------------- //do MPI_Barrier if in normal execution! if( (msg.time() == VTime::Zero) && (msg.nextChange() == VTime::Zero) && (sendMsgType() == ParallelNodeCoordinatorState::InternalMsg) ) { //cerr << "\tNC" << id() << " enter Barrier @ " << time(NULL) << endl; #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- #ifdef JACKY_DEBUG jacky_os << endl << "==== suppressMessage = " << getSuppressMessage() << " ====" << endl << flush; #endif #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] -------------------------------------- #ifdef JACKY_DEBUG jacky_os << "===================== NC enters MPI_Barrier! ========================" << endl << endl << flush; #endif //call synchronizeInitialization() in the LP getLPHandle()->synchronizeInitialization(); //MPI_Barrier!!! //cerr << "\tNC" << id() << " leave Barrier @ " << time(NULL) << endl; } //end MPI_Barrier #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- } //end if getSuppressMessage() != 2 #ifdef JACKY_DEBUG else { //if getSuppressMessage() == 2 //we are in CoastForward Operation, don't do MPI_Barrier since no msg will be send out in CF! jacky_os << endl << "[Note: NC " << this->id() << " in CoastForward -> No MPI_Barrier here!!!] " << endl << flush; } #endif #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] -------------------------------------- #endif //end JACKY_SYNC_TIME0 =========================================================================================== //2> If we have to send a (*) Message, send it now if ( sendMsgType() == ParallelNodeCoordinatorState::InternalMsg ) { //It should be the case that absoluteNext() == msg.time(); MASSERTMSG( absoluteNext() == msg.time(), "NC " + int2Str(id()) + " trying to send (*) msg to the FC when absoluteNext() != msg.time()!" ); InternalMessage intMsg( absoluteNext(), id()); send ( intMsg, localFCId ); //send (*) to the local FC //Set the next message type to Collect sendMsgType( ParallelNodeCoordinatorState::CollectMsg ); #ifdef JACKY_DEBUG jacky_os << "\tNC " << this->id() << " sends (*) msg " << " at " << msg.time().asString() << " to FC " << localFCId << endl << flush; #endif } else {//sendMsgType() == @ //find the MIN time among: // 1> the external events in the EventList, // 2> the (X) msgs from other NCs in the NCMessageBag, // 3> the absoluteNext() of the received (D) msg //Nov. 16, 2005 -> //Note: ALL external events are taken into consideration when calculating the MINtime, // But there maybe some (X) messages from other NCs do not participate in the // calculation since they have not arrived or have arrived but have not yet been // processed (i.e. put in the current NCBag) VTime minTime = calculateMinTime(); #ifdef JACKY_DEBUG jacky_os << "\tNC " << this->id() << " MINTime = " << minTime.asString() << endl << flush; #endif //Should we go on? when minTime > stopTime(), we should stop sending out msgs! if( minTime > stopTime() ){ //this NC is now in the dormancy state! no message should be send out now. //The "dormant" flag in the state should be set to TRUE now //But it still has to wait for other NCs to finish. //It may receive (X) msgs from other NCs. In this case, the NC is activated. //The (X) msgs will be added to the NCBag if no rollback is needed, and the //receive(BasicExternalMessage *) will check the "doramnt" flag, if true, //that function will send the (X) msgs out to the FC; //A rollback will happen on this NC if a straggler (X) msg is received isDormant(true); //set the flag //Sept. 28, 2005 //The (D) msg has been received in this NC. There are no external events //that need to be sent out (they may have already been sent out before now, and //still stay on the inputQ unprocessed! ) and there are no (X) msg in the NCBag //from other NCs that need to be sent (Again, these msgs may stay on the inputQ //unprocessed! ) //The NC will not send out further messages. //For case 1: the unprocessed external events on the inputQ will be scheduled by the // event scheduler automatically //For case 2: the receive(Xmsg) function will add them into the NCBag, and check // whether ALL events with the smallest time stamp on the inputQ have already // been processed. If all events with the smallest time stamp on the inputQ // have been processed, that function will send out the (X) msgs // in the NCBag, and the simulation carries on) } else { //minTime <= stopTime && msgType = @ //We have several possiblities here: // 1> minTime = currentEventTime OR bagTime < absoluteNext() // -> in this case, we have some external events OR (X) msgs from other NCs need to be sent out // and it's not yet the time to trigger internalTransition // 2> minTime = absoluteNext() < currentEventTime AND bagTime // -> in this case, we need to trigger the model's output function first by sending a (@) msg // 3> minTime = currentEventTime OR bagTime = absoluteNext() // -> in this case, there are some external events (either from the environment or // from other NCs) that occur at the same time as the next InternalTransition. // We need to send the extenal events as well as a (@) msg // sendMinTimeEventsToFC( minTime ); //all possible external events with minTime sent MASSERTMSG( ( initialLVT == getLVT() ), "NC: sending EXTERNAL EVENTS has caused rollbacks! Should Never Happen!" ); //--------------------- enternal events done ------------------------- //record the number of (X) msgs with the MINtime //Note: this number will be used to remove all (multiple) straggler messages from the NC's outputQ // and the FC's inputQ. unsigned int numOfMinTimeXmsgs = sendMinTimeNCBagMsgsToFC( minTime ); //--------------------- (X) messages in NCBag sent ------------------------- //Before we remove the (X) mssg in the NCBag and continue, we have to check rollbacks! //Nov. 16, 2005 //We get the LVT from the current state again. There are 2 possibles: // CASE 1: currentLVT = initialLVT // The LVT in the current state has not changed after sending the external events // and (X) messages in the NCBag. There is NO rollback happened on the NC. In this // case, we may proceed normally. // CASE 2: currentLVT != initialLVT (specifically, currentLVT < initialLVT) // The LVT in the current (restored during rollbacks that have happened) state is // less than the initial LVT. Rollbakcs must have happened on this NC. In this case, // there are 2 further situations: // // a> The (X) messages in the NCBag with MINtime have been removed during the rollbacks. // i.e. the NCBag is empty now and the time of the NCBag is INF. Since the (X) messages // have been removed, there is only ONE straggler message has been sent to the FC // (inserted) in the FC's inputQ). We need to remove this straggler message from the // NC's outputQ and the inputQ. // // b> The (X) messages in the NCBag with MINtime have been left there. They are not // removed. // In this case, the rollbackTime for the NC must > the MINtime. Thus, in the previous // FOR loop, there are multiple straggler (X) messages have been sent to the FC. // All these straggler // messages must be removed from the NC's outputQ and the FC's inputQ // For CASE 1, we must skip state saving for this cycle since the state of the NC has been // restored. It has the content of a previous state but strange inputPos and outputPos values. // For CASE 2, we will save the state. VTime currentLVT = getLVT(); if( currentLVT == initialLVT ){ //NO rollbacks happened #ifdef JACKY_DEBUG jacky_os << "-=-=-=[====OOP====] Normal (No rollback)" << endl << flush; jacky_os << "BEFORE Removing minTime msgs in NCBag, NCBag --->" << endl << flush; jacky_os << getNCBag() << endl << flush; #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] jacky_os << "2006-02-03 Current NCBagReference --->" << endl << flush; printNCBagRef( jacky_os ); jacky_os << endl << flush; #endif //end JACKY_NCBAG_POINTERS [2006-02-03] #endif #ifdef JACKY_NCBAG_POINTERS //[2006-02-03]----------------------------------------------------------------- //remove elements in the NCBagReference with key = NCBag.time() getBagRef().erase( getNCBag().time() ); #endif //end JACKY_NCBAG_POINTERS [2006-02-03]------------------------------------------------------------- getNCBag().removeMessageWithMinTime(); //clear the (X) msgs in the bag we have sent #ifdef JACKY_DEBUG jacky_os << "AFTER RemovING minTime msgs in NCBag, NCBag --->" << endl << flush; jacky_os << getNCBag() << endl << flush; #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] jacky_os << "2006-02-03 Current NCBagReference --->" << endl << flush; printNCBagRef( jacky_os ); jacky_os << endl << flush; #endif //end JACKY_NCBAG_POINTERS [2006-02-03] #endif //now sends @/* msg if (minTime == absoluteNext()){ //have imminents, send @ CollectMessage collectMsg = CollectMessage( minTime, id() ); sendMsgType( ParallelNodeCoordinatorState::InternalMsg ); //Set Type to * send ( collectMsg, localFCId ); #ifdef JACKY_DEBUG jacky_os << "\tNC " << this->id() << " sends (@) msg with time = " << absoluteNext().asString() << endl << flush; #endif } else { //No imminents, send * InternalMessage intMsg( minTime, id()); sendMsgType( ParallelNodeCoordinatorState::CollectMsg ); //Set Type to @ send ( intMsg, localFCId ); #ifdef JACKY_DEBUG jacky_os << "\tNC " << this->id() << " sends (*) msg with time = " << minTime.asString() << endl << flush; #endif } } else { //Rollback happened! //how many straggler (X) messages we have sent? //A> Only ONE straggler message has been sent // A [+] straggler message sending fron this NC to the FC has triggered rollbacks // throughout all processors on this machine. The state of this NC has been // restored to a previous state. All (X) messages in the NCBag have been removed. // No more message should be sent out from this NC in this case. if( getNCBag().size() == 0 ) { //all NCBag msgs removed, rollbackTime <= prev NCBag.time #ifdef JACKY_DEBUG jacky_os << "-=-=-=[====OOP====]NC " << this->id() << " ROLLBACK HAPPENED! [" << initialLVT.asString() << " => " << currentLVT.asString() << "] [ONE straggler] NO @/* MSG WILL BE SENT TO THE FC!" << endl << flush; #ifndef ONE_ANTI_MESSAGE //[2006-03-09] //Jacky Note: //In the case of One-Anti-Message, the straggler event has been deleted from the //FC's inputQ. This is a strange situation where the Container created for the //straggler is saved on the current NC's outputQ, but its "object", i.e. the //straggler event, has been deleted! //At this time, printing the outputQ will cause (SIGSEGV 11) error since the //straggler event has been deleted! outputQ.printOutputQ( jacky_os ); //print out current NC's outputQ #endif //end ONE_ANTI_MESSAGE [2006-03-09] #endif //end JACKY_DEBUG //currentLVT < MINtime MASSERTMSG( ( currentLVT < minTime ), "NC: Rollback, ALL msgs in NCBag removed. But currentLVT >= minTime!" ); // If sending a (X) message to the FC from the NCBag caused rollbacks to happen, we // need to do the following things in order to maintain the inputQ, stateQ, and // outputQ of the FC, and NC: // 1. The extra state corresponding to this receive(DoneMessage) should not be saved. // This is done by setting the skipStateSaving flag to true. // 2. The same message should be removed from the outputQ of this NC. This message // should be the LAST message on the current outputQ. // 3. The straggler positive (X) message should be removed from the inputQ of the FC. // This straggler message can be found on the outputQ of this NC. // This is much like an message implosion. // // ----SHOW MESSAGE---- //cerr << "\t\t[" << ParallelMainSimulator::Instance().getMachineID() // << "] ONE Straggler! (NC " << this->id() << ")!!!" << endl; skipStateSaving = true; //1> #ifndef ONE_ANTI_MESSAGE //[2006-03-09] ------------------------------------------------------------------------------------ //this block of code is for NOT using ONE_ANTI_MESSAGE BasicEvent* stragglerEvent = dynamic_cast( outputQ.remove( outputQ.getTail() ) ); //2> #ifdef JACKY_DEBUG if( stragglerEvent != NULL ){ //current straggler event found TWMessage* stragglerMsg = (TWMessage*)stragglerEvent; Message* stragglerCDMsg = stragglerMsg->getMessage(); jacky_os << endl << "((( ONE STRAGGLER CHECK!!! stragglerEvent -> id=" << stragglerEvent->eventId << " sign=" << stragglerEvent->sign << " " << stragglerEvent->sender << "@" << stragglerEvent->sendTime << " =" << stragglerCDMsg->type() << "=> " << stragglerEvent->dest << "@" << stragglerEvent->recvTime << " Processed?" << stragglerEvent->alreadyProcessed << " )))" << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete stragglerCDMsg; #endif } else { jacky_os << endl << "((( ONE STRAGGLER CHECK!!! stragglerEvent -> NULL! STRANGE! )))" << endl << flush; } //print outputQ after removal of the straggler event jacky_os << endl << "After removal of straggler, NC outputQ is: " << endl << flush; outputQ.printOutputQ( jacky_os ); #endif //Jacky Note: //If ONE_ANTI_MESSAGE is used, the straggler [+] msg has already been removed from //the FC's inputQ, so we should not remove the straggler again! //[2006-03-09] //get the FC's localId int FC_localId = SingleParallelProcessorAdmin::Instance().processor(localFCId).localId; #ifdef JACKY_DEBUG jacky_os << endl << "((( FC localId = " << FC_localId << " )))" << endl << flush; #endif //call removeStragglerEvent() on the inputQ BasicTimeWarp::inputQ.removeStragglerEvent(stragglerEvent, FC_localId); #else //ONE_ANTI_MESSAGE [2006-03-09] ------------------------------------------------------------------------------------ //this block of code is for ONE_ANTI_MESSAGE! //Only remove the last element on the NC's outputQ, straggler has been deleted //from FC's inputQ, so don't delete the straggler again! //this will delete the last Container on NC's outputQ outputQ.remove(outputQ.getTail()); //2> //we don't need to get the straggler since no need to delete it from FC's inputQ #ifdef JACKY_DEBUG //print outputQ after removal of the straggler event jacky_os << endl << "[One-Anti-message] CHECK one-straggler NC outputQ!" << endl << endl << flush; cerr << "NC " << this->id() << " CHECK one-straggler NC outputQ!" << endl; jacky_os << endl << "After removal of straggler, NC outputQ is: " << endl << flush; outputQ.printOutputQ( jacky_os ); #endif //end JACKY_DEBUG #endif //end ONE_ANTI_MESSAGE [2006-03-09] ------------------------------------------------------------------------------- } //B> There are MULTIPLE (numOfMinTimeXmsgs) straggler messages need to be sent // Multiple straggler messages have been inserted into the FC's inputQ. // Since the first straggler has sendTime > recvTime, the order of these straggler // messages on the NC's outputQ is wrong! We need to do the following things in // order to correct this problem: // 1. The straggler messages should be removed from the outputQ of this NC. // These messages are the LAST numOfMinTimeXmsgs messages on the current outputQ. // 2. The straggler [+] (X) messages should be removed from the inputQ of the FC. // This is much like message implosions. // 3. Then, we will resend these messages // Note: (Nov. 24, 2005) // Before we resend messages (resending these messages is like INSERTing simulations // cycles), we need to clean up previous jump-messages and the previous state that have // been saved on the NC's stateQ. After the rollbacks, this state should be the LAST // state on the queue and the restored current state! else { //MINtime msgs left in NCBag, rollbackTime > prev NCBag.time //raise error if time of NCBag OR number of MINtime msgs changed MASSERTMSG( ( (minTime == getNCBag().time()) && (numOfMinTimeXmsgs == getNCBag().getMessageWithMinTime().size()) ), "NC: Rollback, multiple straggler. But BagTime/numOfMinTimeXmsgs changed!" ); #ifdef JACKY_DEBUG //------------------------------------------------------------------------------------------------------ jacky_os << endl << "-=-=-=[====OOP====]NC " << this->id() << " ROLLBACK HAPPENED! [" << initialLVT.asString() << " => " << currentLVT.asString() << "] [MULTIPLE stragglers]!" << endl << endl << flush; outputQ.printOutputQ( jacky_os ); //print out current NC's outputQ #endif //------------------------------------------------------------------------------------------------------------------ //----SHOW MSG---- //cerr << "\t\t\t\t\t\t\t[~~" << ParallelMainSimulator::Instance().getMachineID() // << "~~] Multiple Stragglers! (NC " << this->id() << ")!!!" << endl; //get the FC's localId int FC_localId = SingleParallelProcessorAdmin::Instance().processor(localFCId).localId; #ifdef JACKY_DEBUG //----------------------------------------------------------------------------------------------------- jacky_os << endl << "((( FC localId = " << FC_localId << " )))" << endl << flush; //isNC = true; //Nov. 22, 2005 set the rollbackCheckTimes flag #endif //----------------------------------------------------------------------------------------------------------------- //1> remove all straggler msgs we have sent from NC's outputQ & FC's inputQ for( unsigned int i = 0; i < numOfMinTimeXmsgs; i++ ){ BasicEvent* stragglerEvent = dynamic_cast( outputQ.remove( outputQ.getTail() ) ); #ifdef JACKY_DEBUG //------------------------------------------------------------------------------------------------------ if( stragglerEvent != NULL ){ //current straggler event found TWMessage* stragglerMsg = (TWMessage*)stragglerEvent; Message* stragglerCDMsg = stragglerMsg->getMessage(); jacky_os << "((( MULTIPLE STRAGGLERS CHECK!!! stragglerEvent -> id=" << stragglerEvent->eventId << " sign=" << stragglerEvent->sign << " " << stragglerEvent->sender << "@" << stragglerEvent->sendTime << " =" << stragglerCDMsg->type() << "=> " << stragglerEvent->dest << "@" << stragglerEvent->recvTime << " Processed?" << stragglerEvent->alreadyProcessed << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete stragglerCDMsg; #endif } else { jacky_os << "((( MULTIPLE STRAGGLERS CHECK!!! stragglerEvent" << " -> NULL!!! STRANGE!!!!" << endl << flush; } #endif //------------------------------------------------------------------------------------------------------------------ //call removeStragglerEvent() on the inputQ BasicTimeWarp::inputQ.removeStragglerEvent(stragglerEvent, FC_localId); } //end For #ifdef JACKY_DEBUG //------------------------------------------------------------------------------------------------------ jacky_os << endl << "After removal of stragglers, NC outputQ is: " << endl << flush; outputQ.printOutputQ( jacky_os ); //Following is the most difficult part! //We have to do the following: //Since we have been rolled back to the current minTime, the current state is a copy //of the one that have been saved at the end of the minTime! //i.e. currentState is restored! //We need to check whether there are (X), (@) and/or (*) messages that have already //been sent out and unprocessed during the rollbacks. If there are those messages on //the inputQ, we have to remove them (this also requires correctly recover the NC's //stateQ, NC's outputQ) //print the current inputQ for the FC jacky_os << endl << "-=-=-=[====OOP====] FC inputQ -=-=-=[====OOP====]" << endl << flush; BasicTimeWarp::inputQ.printMiniInputQ( jacky_os, FC_localId ); jacky_os << endl << "-=-=-=[====OOP====] NC inputQ -=-=-=[====OOP====]" << endl << flush; BasicTimeWarp::inputQ.printMiniInputQ( jacky_os, this->localId ); //jacky_os << endl << "-=-=-=[====OOP====] Whole inputQ -=-=-=[====OOP====]" // << endl << flush; //BasicTimeWarp::inputQ.printFullInputQ( jacky_os ); jacky_os << endl << endl << flush; #endif //------------------------------------------------------------------------------------------------------------------ //2> Before resending the (X) messages in the NCBag, we need to restore the NC's // outputQ, FC's inputQ and the NC's stateQ to their previous content, content // before the event-jump. //For example, if there was an event-jump from 2000 to 4000, we have sent two //messages [1] NC@2000 -X-> FC@4000, and [2] NC@2000 -*-> FC@4000 during the //previous cycle when we received a (D) message from the FC at time 2000. A state //have been saved after sending these 2 messages on the NC's stateQ. Now, after //processing these 2 jump-messages, FC sent a (D) message to the NC at time 4000. //But the NC found that there are newly arrived (X) messages in the NCBag at time //3000. Thus, the NC insert the 1st (X) message with recvTime = 3000 into the FC's //inputQ, and this caused rollbacks. Since we do NOT save states after invoking //receive(Xmsg), the NC should have been rolled back to time 2000, which is the //LAST state on the NC's stateQ and this state records those 2 jump-messages the NC //have sent before. //At this stage, we need to remove the last state from the NC's stateQ, all messages //that we have sent before saving this state from the NC's outputQ as well as the //FC's inputQ. i.e. we redo the receive(D) function at time 2000, and now we have //minTime = 3000, which is the time of (X) messages in the current NCBag. //Also, we need to reset the eventCursor of the EventList to rollback the external //events that we have sent at time 2000. //a> get the unprocessed @ or * message from the FC's inputQ BasicEvent* futureCIEvent = findUnprocessedCollectORInternalMessageAfter(minTime); //b> recover the NC's stateQ, outputQ, and FC's inputQ if ( futureCIEvent != NULL ) { #ifndef JACKY_NC_GET_OUTPUT_MSG_FROM_futureCIEvent //[2006-02-13] ================== previous version ================== //Note: in this previous version, the Containers on NC's outputQ are found based on the outputPos of the stateFound and //the state before the stateFound on NC's stateQ. Since one or both of these states may be garbage collected when we //find them, their outputPos may be NULL. This will result in runtime exception! //Hence, we define new functions for finding the future events and the outputQ Containers directly from the inputQ //based on the futureCIEvnt. //[new version starts from line 1669] #ifndef JACKY_NC_GET_STATE_FROM_TAIL //[2006-02-07]-- original version -> find the state by outputPos=futureCIEvnt -- //get the states whose outputPos pointing to this event list statesToBeChanged = state->findAllStateWithOutputPos(futureCIEvent); unsigned int numOfStates = statesToBeChanged.size(); //----SHOW MSG---- //cerr << "\t\t\t\t\t\t\tSTATES FOUND !!!" << endl; #ifdef JACKY_DEBUG list::const_iterator stateCursor; jacky_os << "[~~SEE~~] STATES FOUND:" << endl << flush; stateCursor = statesToBeChanged.begin(); while(stateCursor != statesToBeChanged.end()){ jacky_os << "[====OOP====] --> " << (*stateCursor) << endl << flush; stateCursor++; } #endif //there should be only ONE state found MASSERTMSG( ( numOfStates == 1 ), "Check! NC: more than one state found for futureCIEvent! (" + int2Str(numOfStates) + ") states found!" ); BasicState* stateFound = statesToBeChanged.front(); //also the state should be the TAIL of the current stateQ MASSERTMSG( ( stateFound == state->getStateQueue()->getTail() ), "Check! NC: state found for futureCIEvent is not TAIL!" ); #else //[2006-02-07]--- new version to get the state directly from the tail of the NC's stateQ ---------------------- //directly get the tail of the NC's stateQ BasicState* stateFound = state->getStateQueue()->getTail(); if( stateFound != NULL ){ #ifdef JACKY_DEBUG jacky_os << "[~~SEE~~] STATES FOUND [" << stateFound << "] " << (*stateFound) << endl << flush; if( stateFound->outputPos != NULL && stateFound->outputPos->object != futureCIEvent ){ jacky_os << endl << "NC: stateFound->outputPos != futureCIEvent !" << endl << flush; //print current stateQ and outputQ jacky_os << endl << "[~~SEE~~] NC stateQ [~~SEE~~]" << endl << flush; state->printQ(jacky_os); jacky_os << endl << endl << flush; jacky_os << endl << "[~~SEE~~] NC outputQ [~~SEE~~] " << endl << flush; outputQ.printOutputQ( jacky_os ); jacky_os << endl << endl << flush; } #endif //test whether its outputPos is the futureCIEvent if( stateFound->outputPos != NULL ){ MASSERTMSG( (stateFound->outputPos->object == futureCIEvent), "NC: stateFound->outputPos != futureCIEvent !" ); } else { cerr << "stateFound->outputPos == NULL" << endl; cerr << "\t Current NC " << this->id() << " time: " << minTime.asString() << " / stateFound time: " << (stateFound->lVT).asString() << endl; cerr << "\t GVT = " << (this->gVTHandle->getGVT()).asString() << endl; #ifdef JACKY_DEBUG jacky_os << "2006-02-07 STATES FOUND [" << stateFound << "] " << (*stateFound) << endl << flush; jacky_os << "2006-02-07 state.outputPos = NULL!" << endl << flush; //print current stateQ and outputQ jacky_os << endl << "[~~SEE~~] NC stateQ [~~SEE~~]" << endl << flush; state->printQ(jacky_os); jacky_os << endl << endl << flush; jacky_os << endl << "[~~SEE~~] NC outputQ [~~SEE~~] " << endl << flush; outputQ.printOutputQ( jacky_os ); jacky_os << endl << endl << flush; #endif //stateFound's outputPos = NULL && inputPos != NULL //let's find out this and what's in the current state cerr << "stateFound = " << stateFound << "[" << (*stateFound) << "]" << endl; cerr << "current state = [" << *(state->current) << "]" < size = " << state->getStateQueue()->size() << endl; //state->printQ(cerr); cerr << endl << "NC::outputQ -> size = " << outputQ.size() << endl; //outputQ.printOutputQ( cerr ); //cerr << endl << "NC::inputQ ->" << endl; //BasicTimeWarp::inputQ.printMiniInputQ(cerr, this->localId); //cerr << endl << "FC::inputQ ->" << endl; //BasicTimeWarp::inputQ.printMiniInputQ(cerr, FC_localId); MASSERTMSG( false, "NC: stateFound->outputPos == NULL !" ); } //end stateFound->outputPos != NULL } else { //stateFound == NULL cerr << "NC -> state found is NULL!" << endl; cerr << "Current NC.stateQ size = " << state->getStateQueue()->size() << endl; #ifdef JACKY_DEBUG jacky_os << "State Found is NULL !!!!!" << endl << flush; //print the current NC stateQ jacky_os << endl << "[~~SEE~~] NC stateQ [~~SEE~~]" << endl << flush; state->printQ(jacky_os); jacky_os << endl << endl << flush; jacky_os << endl << "[~~SEE~~] NC outputQ [~~SEE~~] " << endl << flush; outputQ.printOutputQ( jacky_os ); jacky_os << endl << endl << flush; #endif MASSERTMSG( false, "Check! NC::stateFound = NULL !" ); } //end stateFound != NULL #endif //end JACKY_NC_GET_STATE_FROM_TAIL [2006-02-07] ------------------------------------------------------------- //now, get all messages on the outputQ for this state list< Container* > allOutputMessages = findAllOutputMessagesForState( stateFound ); list< Container* >::const_iterator con_it; #ifdef JACKY_DEBUG jacky_os << "[~~SEE~~] All output messages for stateFound: " << endl << flush; int con_index = 1; for( con_it = allOutputMessages.begin(); con_it != allOutputMessages.end(); con_it++ ){ BasicEvent* tempEvent = (*con_it)->object; TWMessage* tempMessage = (TWMessage*)tempEvent; jacky_os << "[" << con_index << "] " << *((*con_it)->object) << flush; if( tempMessage->derivedFromExternalEvent() == true ){ jacky_os << " [Event = T]" << flush; } jacky_os << endl << flush; con_index++; } #endif //find external events for which we need to rollback the eventCursor list< BasicEvent* > extEventMessages = findMessagesDerivedFromExternalEventsFromContainers(allOutputMessages); int numOfExternalEvents = extEventMessages.size(); if( numOfExternalEvents > 0 ){ //have msgs derived from external events VTime eventTime = extEventMessages.front()->recvTime; #ifdef JACKY_DEBUG jacky_os << "[~~SEE~~] EventTime = " << eventTime.asString() << " / eventMsgNum = " << numOfExternalEvents << endl << flush; list< BasicEvent* >::const_iterator extEv_it; int ev_index = 1; for( extEv_it = extEventMessages.begin(); extEv_it != extEventMessages.end(); extEv_it++ ){ TWMessage* tempMessage = (TWMessage*)(*extEv_it); jacky_os << "[" << ev_index << "] " << *(*extEv_it) << flush; if( tempMessage->derivedFromExternalEvent() == true ){ jacky_os << " [Event = T]" << flush; } jacky_os << endl << flush; ev_index++; } #endif //rollback these events rollbackExternalEvents( eventTime ); #ifdef JACKY_DEBUG jacky_os << endl << "[~~SEE~~] After RB, EventList is :" << endl << flush; showEvents( events() , jacky_os ); #endif } else { //No msg derived from external events #ifdef JACKY_DEBUG jacky_os << endl << "Multiple-Straggler, No Events!!!" << endl << flush; #endif } //Now, we will do: //1> remove all "allOutputMessages" from NC's outputQ //2> remove the same messages from FC's inputQ for( con_it = allOutputMessages.begin(); con_it != allOutputMessages.end(); con_it++ ){ //Note: [2006-02-13] //we should only remove containers from the outputQ iff the outputQ //is not empty! BasicEvent* inputQEvent = dynamic_cast( outputQ.remove( (*con_it) ) ); #ifdef JACKY_DEBUG if( inputQEvent != NULL ){ TWMessage* inputQMsg = (TWMessage*)inputQEvent; Message* inputQCDMsg = inputQMsg->getMessage(); jacky_os << endl << "[~~SEE~~] REMOVE inputQEvent -> id=" << inputQEvent->eventId << " sign=" << inputQEvent->sign << " " << inputQEvent->sender << "@" << inputQEvent->sendTime << " =" << inputQMsg->getMessage()->type() << "=> " << inputQEvent->dest << "@" << inputQEvent->recvTime << " Processed?" << inputQEvent->alreadyProcessed << flush; #ifdef JACKY_EVENT_JUMP // Nov. 22, 2005 if(inputQMsg->derivedFromExternalEvent() == true){ jacky_os << " [Event = T]" << flush; } #endif //end JACKY_EVENT_JUMP Nov. 22, 2005 jacky_os << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete inputQCDMsg; #endif } else { MASSERTMSG( false, "Check! NC: inputQEvent = NULL !" ); } #endif //end JACKY_DEBUG //call removeStragglerEvent() on the inputQ BasicTimeWarp::inputQ.removeStragglerEvent(inputQEvent, FC_localId); } //end for #else //[2006-02-13] ======================================== new version ================================================ //Note: this is the new version for finding outputQ Containers and BasicEvents derived from external events from the //future events found on the inputQ. All future events are found directly from the inputQ based on the futureCIEvent. //2>directly get the tail of the NC's stateQ BasicState* stateFound = state->getStateQueue()->getTail(); //3> if stateFound = NULL, raise error if( stateFound == NULL ){ cerr << "NC " << this->id() << " -> stateFound = NULL!" << " NC::stateQ size = " << state->getStateQueue()->size() << endl; #ifdef JACKY_DEBUG jacky_os << endl << "[~~~~] stateFound is NULL" << endl << flush; //print the current NC stateQ jacky_os << endl << "[~~~~] NC stateQ [~~~~]" << endl << flush; state->printQ(jacky_os); jacky_os << endl << endl << flush; jacky_os << endl << "[~~~~] NC outputQ [~~~~] " << endl << flush; outputQ.printOutputQ( jacky_os ); jacky_os << endl << endl << flush; #endif MASSERTMSG( false, "Check! NC::stateFound = NULL !" ); } //4> find all future events sent along with the futureCIEvent //the returned list contains at least one message, i.e. the futureCIEvent list< BasicEvent* > allFutureEvents = findAllFutureEvents( futureCIEvent ); list< BasicEvent* >::const_iterator ev_it; #ifdef JACKY_DEBUG jacky_os << "[~~~~] All future events: " << endl << flush; ev_it = allFutureEvents.begin(); int ev_index = 1; for( ; ev_it != allFutureEvents.end(); ev_it++ ){ BasicEvent* tempEvent = (*ev_it); TWMessage* tempMessage = (TWMessage*)tempEvent; jacky_os << "[" << ev_index << "] " << *tempEvent << flush; #ifdef JACKY_EVENT_JUMP // Nov. 22, 2005 if( tempMessage->derivedFromExternalEvent() == true ){ jacky_os << " [Event = T]" << flush; } #endif //end JACKY_EVENT_JUMP Nov. 22, 2005 jacky_os << endl << flush; ev_index++; } #endif //5> find all messages derived from external events from allFutureEvents // we need to rollback the eventCursor in EventList for these events list< BasicEvent* > extEventMessages = findMessagesDerivedFromExternalEventsFromBasicEvents(allFutureEvents); int numOfExternalEvents = extEventMessages.size(); if( numOfExternalEvents > 0 ){ //have msgs derived from external events VTime eventTime = extEventMessages.front()->recvTime; #ifdef JACKY_DEBUG jacky_os << "[~~~~] EventTime = " << eventTime.asString() << " / eventMsgNum = " << numOfExternalEvents << endl << flush; ev_it = extEventMessages.begin(); ev_index = 1; for( ; ev_it != extEventMessages.end(); ev_it++ ){ TWMessage* tempMessage = (TWMessage*)(*ev_it); jacky_os << "[" << ev_index << "] " << *(*ev_it) << flush; #ifdef JACKY_EVENT_JUMP // Nov. 22, 2005 if( tempMessage->derivedFromExternalEvent() == true ){ jacky_os << " [Event = T]" << flush; } #endif //end JACKY_EVENT_JUMP Nov. 22, 2005 jacky_os << endl << flush; ev_index++; } #endif //rollback these events rollbackExternalEvents( eventTime ); #ifdef JACKY_DEBUG jacky_os << endl << "[~~~~] After RB, EventList:" << endl << flush; showEvents( events() , jacky_os ); #endif } else { //No msg derived from external events #ifdef JACKY_DEBUG jacky_os << endl << "Multiple-Straggler, No Events!" << endl << flush; #endif } //6> find all output messages on NC's outputQ if they are still there // the returned list is empty if the NC's outputQ is empty list< Container* > allOutputMessages = findAllOutputMessagesForFutureEvents( allFutureEvents ); //7> remove these output messages from NC's outputQ if( allOutputMessages.size() > 0 ) { //the output messages are on the outputQ list< Container* >::const_iterator con_it; #ifdef JACKY_DEBUG //show NC outputQ jacky_os << endl << "[~~~~] Before removal, NC outputQ [~~~~] " << endl << flush; outputQ.printOutputQ( jacky_os ); jacky_os << endl << endl << flush; //show the output messages jacky_os << "--> All output messages for the future events: " << endl << flush; int con_index = 1; for( con_it = allOutputMessages.begin(); con_it != allOutputMessages.end(); con_it++ ){ BasicEvent* tempEvent = (*con_it)->object; TWMessage* tempMessage = (TWMessage*)tempEvent; jacky_os << "[" << con_index << "] " << *tempEvent << flush; if( tempMessage->derivedFromExternalEvent() == true ){ jacky_os << " [Event = T]" << flush; } jacky_os << endl << flush; con_index++; } #endif //remove these output messages con_it = allOutputMessages.begin(); for( ; con_it != allOutputMessages.end(); con_it++ ){ //Note: Containers deleted! //but BasicEvent object in the Container is returned!!! outputQ.remove( (*con_it) ); } #ifdef JACKY_DEBUG //show NC outputQ jacky_os << endl << "[~~~~] After removal, NC outputQ [~~~~] " << endl << flush; outputQ.printOutputQ( jacky_os ); jacky_os << endl << endl << flush; #endif } else { //allOutputMessages.size() = 0, no output message found on outputQ //Note: the output messages have already been removed by garbage // collector, we need not to remove these output messages cerr << "NC "<id()<<" : allOutputMessages.size = 0!" << endl; if(stateFound->outputPos == NULL){ cerr << "stateFound->outputPos == NULL" << endl; } else { cerr << "STRANGE! stateFound->outputPos != NULL" << endl; } cerr << "\t Current NC " << this->id() << " time: " << minTime.asString() << " / stateFound time: " << (stateFound->lVT).asString() << endl; cerr << "\t GVT = " << (this->gVTHandle->getGVT()).asString() << endl; #ifdef JACKY_DEBUG jacky_os << "NC "<id()<<" : allOutputMessages.size = 0!" << endl << flush; jacky_os << "2006-02-13 STATES FOUND [" << stateFound << "] " << (*stateFound) << endl << flush; if(stateFound->outputPos == NULL){ jacky_os << "state.outputPos = NULL!" << endl << flush; } else { jacky_os << "STRANGE! state.outputPos != NULL!" << endl << flush; } //print the stateQ and outputQ jacky_os << endl << "[~~~~] NC stateQ [~~~~]" << endl << flush; state->printQ(jacky_os); jacky_os << endl << endl << flush; jacky_os << endl << "[~~~~] NC outputQ [~~~~]" << endl << flush; outputQ.printOutputQ( jacky_os ); jacky_os << endl << endl << flush; #endif // cerr << "stateFound = " << stateFound << "[" // << (*stateFound) << "]" << endl; // cerr << "current state = [" // << *(state->current) << "]" <id() << " stateQ.size = " << state->getStateQueue()->size() << " / outputQ.size = " << outputQ.size() << endl; } //8> remove all future events from FC's inputQ ev_it = allFutureEvents.begin(); for( ; ev_it != allFutureEvents.end(); ev_it++ ){ //this will delete the BasicEvent object BasicTimeWarp::inputQ.removeStragglerEvent( (*ev_it), FC_localId); } #endif //end JACKY_NC_GET_OUTPUT_MSG_FROM_futureCIEvent [2006-02-13] ====================================================== //9> remove the "stateFound" from the NC's stateQ #ifdef JACKY_DEBUG jacky_os << endl << "== Remove stateFound [" << stateFound << "]" << endl << flush; #endif state->getStateQueue()->remove(stateFound); #ifdef JACKY_DESTROY_STATEFOUND //[2006-03-27] *********************************************** //explicitly destroy the stateFound! delete stateFound; //cerr << "NC " << this->id() << " delete stateFound!!!" << endl; #endif //end JACKY_DESTROY_STATEFOUND //[2006-03-27] ***************************************** #ifdef JACKY_DEBUG jacky_os << endl << "[~~SEE~~] After removal of ALL events, NC outputQ is: " << endl << flush; outputQ.printOutputQ( jacky_os ); //print the current inputQ for the FC jacky_os << endl << "~~~~[~~SEE~~] FC inputQ [~~SEE~~]~~~~" << endl << flush; BasicTimeWarp::inputQ.printMiniInputQ( jacky_os, FC_localId ); jacky_os << endl << "~~~~[~~SEE~~] NC inputQ [~~SEE~~]~~~~" << endl << flush; BasicTimeWarp::inputQ.printMiniInputQ( jacky_os, this->localId ); jacky_os << endl << "~~~~[~~SEE~~] NC stateQ [~~SEE~~]~~~~" << endl << flush; state->printQ(jacky_os); jacky_os << endl << endl << flush; #endif //end JACKY_DEBUG } else { cerr << "\t\t\t\t\t\tfutureCIEvent = NULL, Strange!!! CHECK! -----" << endl; MASSERTMSG( false, "Check! NC: futureCIEvent = NULL !" ); } //end iff futureCIEvent != NULL #ifdef JACKY_NC_BP_STATE //[2006-02-02] //before re-sending the (X) messages in the NCBag, we need to set the LVT in the //current state to the time of the messages in the NCBag state->current->lVT = minTime; //cerr << "\t\tNC " << this->id() << " BP @ " << minTime.asString() << endl; //mark the state to be saved as a "Break-Point" state on the NC's stateQ state->current->breakpoint = true; #ifdef JACKY_DEBUG jacky_os << endl << "[2006-03-27] NC set current->lVT = " << (state->current->lVT).asString() << endl << flush; jacky_os << endl << "[BP_STATE_CHECK] NC BP state!!!" << endl << flush; jacky_os << endl << "[2006-03-27] NC set current->BP = TRUE!" << endl << flush; #endif //Note: 1. the "breakpoint" flag is defined in TimeWarpBasicState class // 2. this flag will be reset to false by the StateManager after saving the // copy of the current state on the NC's stateQ #endif //end JACKY_NC_BP_STATE [2006-02-02] //now resend these (X) messages in the NCBag //Note: this time, NO rollbacks should happen! unsigned int numOfMinTimeXmsgs2 = sendMinTimeNCBagMsgsToFC( minTime ); #ifndef JACKY_NC_BP_STATE //[2006-02-02] MASSERTMSG( ( (currentLVT == getLVT()) && (numOfMinTimeXmsgs2 == numOfMinTimeXmsgs) ), "NC: resends (X) in NCBag. But RB again/numOfMinTimeXmsgs changed!" ); #else //[2006-02-02] //getLVT() returns the LVT of the current state, we have advanced it to the minTime MASSERTMSG( ( (minTime == getLVT()) && (numOfMinTimeXmsgs2 == numOfMinTimeXmsgs) ), "NC: resends (X) in NCBag. But RB again/numOfMinTimeXmsgs changed!" ); #endif //end JACKY_NC_BP_STATE [2006-02-02] #ifdef JACKY_DEBUG jacky_os << "-=-=-=[====OOP====] (X) in NCBag with minTime = " << minTime.asString() << " are sent AGAIN!" << endl << flush; jacky_os << "BEFORE Removing minTime msgs in NCBag, NCBag --->" << endl << flush; jacky_os << getNCBag() << endl << flush; #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] jacky_os << "2006-02-03 Current NCBagReference --->" << endl << flush; printNCBagRef( jacky_os ); jacky_os << endl << flush; #endif //end JACKY_NCBAG_POINTERS [2006-02-03] #endif //end JACKY_DEBUG #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] //remove elements in the NCBagReference with key = NCBag.time() getBagRef().erase( getNCBag().time() ); #endif //end JACKY_NCBAG_POINTERS [2006-02-03] getNCBag().removeMessageWithMinTime(); //clear the (X) msgs in the bag we have sent #ifdef JACKY_DEBUG jacky_os << "AFTER Removing minTime msgs in NCBag, NCBag --->" << endl << flush; jacky_os << getNCBag() << endl << flush; #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] jacky_os << "2006-02-03 Current NCBagReference --->" << endl << flush; printNCBagRef( jacky_os ); jacky_os << endl << flush; #endif //end JACKY_NCBAG_POINTERS [2006-02-03] #endif //end JACKY_DEBUG //[2006-02-03] //After resending the (X) messages in the NCBag, we need to check the NCBag again //to see whether there are still (X) messages in it. If so, we have to do: // 1. remove these (X) messages in the NCBag // 2. get the addresses of BasicEvents from which these (X) messgaes were // derived in the NCBagReference. // 3. unprocess these BasicEvents on the inputQ, set their alreadyProcessed // flags to false // 4. adjust currentObj[nc] & currentPos of the inputQ accordingly // 5. adjust the lVTArray[nc] to the current minTime //These operations should be done a time step after another until all (X) messages //are removed from the NCBag (NCBag.time = Inf) #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] if( getNCBag().size() > 0 ) { int ncLocalId = this->localId; #ifdef JACKY_DEBUG jacky_os << endl << "2006-02-03 ATTENTION! NON-EMPTY NCBAG! minTime = " << minTime.asString() << " / NCBag.time = " << getNCBag().time().asString() << " / NCBag.size = " << getNCBag().size() << " / NClocalId = " << ncLocalId << endl << flush; #endif //1. adjust lVTArray[ncLocalId] to the minTime if NCBag.size > 0 BasicTimeWarp::inputQ.resetLVTArrayTime(minTime, ncLocalId); #ifdef JACKY_NC_MULTI_STRAGGLER_FILEQ //[2006-04-26] ============================================================= //Jacky Note: //When we remove and undo the (X) messages left in the NCBag, we //also need to recover the log file for the NC since these pending //(X) messages have already been logged in the NC's outFileQ!!! //We need to get the min time of the (X) messages left in the NCBag //and rollback the file queue (log file) for the NC in order to //delete all data that have been logged for these (X)messages. #ifdef JACKY_DEBUG cerr << "NC " << this->id() << "[NC_LOG_PENDING_TS]: check fileQ!!" << endl; jacky_os << endl << "[NC_LOG_PENDING_TS] current NC log fileQ is ..." << endl << flush; for (int i = 0; i < numOutFiles; i++) { jacky_os << "\t===== outFileQ[" << i << "] -> " << outFileQ[i].getOutFileName() << endl << flush; outFileQ[i].printFileQ(jacky_os); } jacky_os << endl << flush; #endif if( getNCBag().size() > 0 ) { //get the MIN time among these (X) messages VTime minimumBagTime = getNCBag().time(); //call rollbackFileQueues(minimumBagTime) to remove all //logged data with time >= this min time (i.e. all log data //for the pending (X) messages we are going to remove and undo) rollbackFileQueues(minimumBagTime); } #endif //end JACKY_NC_MULTI_STRAGGLER_FILEQ [2006-04-26] ========================================================= //2. unprocess all (X) messages in the NCBag while( getNCBag().size() > 0 ){ //the time of the (X) messages in NCBag we want to unprocess VTime currentBagTime = getNCBag().time(); //this time should be greater than the minTime MASSERTMSG( minTime < currentBagTime, "NC " + int2Str(this->id()) + " -- currentBagTime >= minTime!" ); #ifdef JACKY_DEBUG jacky_os << "2006-02-03 Unprocess (X) message with time = " << currentBagTime.asString() << endl << flush; cerr << "\t\t\tNon-empty NCBag [Multi-straggler]: NC " << this->id() << " -> unprocess X event @ " << currentBagTime.asString() << endl; #endif //get the references NCBagReference::iterator refPos; //iterate over all BasicEvent with this time for(refPos = getBagRef().lower_bound(currentBagTime); refPos != getBagRef().upper_bound(currentBagTime); refPos++ ) { //for each of this BasicEvent address BasicEvent* pxEvent = refPos->second; #ifdef JACKY_DEBUG jacky_os << endl << "\t2006-02-03 unprocess BasicEvent [" << pxEvent << "]" << endl << flush; #endif BasicTimeWarp::inputQ.unprocessXEvent(pxEvent, ncLocalId); } //end for each BasicEvent //remove all (X) with currentBagTime from NCBag getNCBag().removeMessageWithMinTime(); //remove all refence with currentBagTime from NCBagReference getBagRef().erase( currentBagTime ); }//end while NCBag.size > 0 MASSERTMSG( getNCBag().size() == 0, "Errro: NC " + int2Str(this->id()) + " -- non-empty NCBag after unprocessing!" ); MASSERTMSG( getBagRef().size() == 0, "Error: NC " + int2Str(this->id()) + " -- non-empty NCBagRef after unprocessing!" ); ///////////////////////////////////////////////////////////////////// //MASSERTMSG( false, "Stop here: NC " + int2Str(ncLocalId) // + " -- check!" ); ///////////////////////////////////////////////////////////////////// #ifdef JACKY_DEBUG jacky_os << "Current inputQ is --> " << endl << flush; BasicTimeWarp::inputQ.printFullInputQ( jacky_os ); jacky_os << endl << "2006-02-03 ATTENTION! NON-EMPTY NCBAG PROCESSED!" << endl << endl << flush; #endif } //end if NCBag.size > 0 #endif //end JACKY_NCBAG_POINTERS [2006-02-03] //send out @ or * msg to the FC at the current minTime based on the current state, //which has been restored during previous rollbacks. //The absoluteNext() should be recalculated from the current (restored) state //now sends @/* msg if (minTime == absoluteNext()){ //have imminents, send @ CollectMessage collectMsg = CollectMessage( minTime, id() ); sendMsgType( ParallelNodeCoordinatorState::InternalMsg ); //Set Type to * send ( collectMsg, localFCId ); #ifdef JACKY_DEBUG jacky_os << "\tNC " << this->id() << " sends (@) msg with NEW absoluteNext = " << absoluteNext().asString() << " ---- minTime = " << minTime.asString() << endl << flush; #endif } else { //No imminents, send * InternalMessage intMsg( minTime, id()); sendMsgType( ParallelNodeCoordinatorState::CollectMsg ); //Set Type to @ send ( intMsg, localFCId ); #ifdef JACKY_DEBUG jacky_os << "\tNC " << this->id() << " sends (*) msg with minTime = " << minTime.asString() << " ---- absoluteNext = " << absoluteNext().asString() << endl << flush; #endif } #ifdef JACKY_DEBUG jacky_os << endl << "[~~SEE~~] After sending */@, NC outputQ is: " << endl << flush; outputQ.printOutputQ( jacky_os ); #endif } //end Multiple Stragglers } //end if Rollback happened } //end minTime > stopTime() } //end sendMsgType() == @ return *this; } #endif //end JACKY_RB_EVENT ------------------------------------------------------------------- /******************************************************************* * Function Name: receive(BasicOutputMessage *) * Description: this function processes the Y msg received from the local FC. * It finds out whether the Y msg should be output to the environment. * if so, the Y msg is forwarded to the Root; it also finds out whether * the Y msg is sending to remote simulators. if so, The NC checks what * remote NCs should receive the msg. It translates the received Y msg to * X msgs and relays them to the remote NCs ********************************************************************/ ParallelProcessor &ParallelNodeCoordinator::receive( const BasicOutputMessage *msg ){ #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "[" << ParallelMainSimulator::Instance().getMachineID() << "] NC procId = " << this->id() << " received (Y) msg at time = " << msg->time() << " / sender = " << msg->procId() << " / value = " << (msg->value())->asString() << " / destPort = " << (msg->port()).name() << endl << flush; #endif sortOutputMessage( *msg ); //all dirty jobs are done here delete msg; #ifdef JACKY_MSG_TYPE_BASED_STATE_SAVING //[2006-02-23] skipStateSaving = true; //this causes the TimeWarp to skip state saving after processing this event #ifdef JACKY_DEBUG jacky_os << "##### NC::skipStateSaving = TRUE #####" << endl << flush; #endif //end JACKY_DEBUG #endif //end JACKY_MSG_TYPE_BASED_STATE_SAVING [2006-02-23] return *this; } /******************************************************************* * Function Name: receive(BasicExternalMessage *) * Description: this function receives (X) msgs coming from the other NCs. * it adds the coming (X) msgs into the MessageBag of this NC. * These (X) msgs will be forwared to the local FC in function * receive( const DoneMessage &) * If the NC is in the dormant state, this function will send the (X) msgs * in the bag out to the FC. Thus, restart the simulation on this machine! * Note: this may cause a rollback! we need to check whether a rollback has * happened, and clean our own data accordingly. This is done by * function rollbackCheck() ********************************************************************/ ParallelProcessor &ParallelNodeCoordinator::receive( const BasicExternalMessage *msg) { //Note: //MessageBag can only hold msgs with the same time stamp. Since there is NO //synchronization among NCs on different machines, it's perfectly prossible //that (X) msgs with different time stamps will be received by this NC. //As a result, we will use NCMessageBag (a special subclass of MessageBag) rather //than using the MessageBag directly //externalMsgs.add( msg ); //Wrong! should use NCMessageBag instead getNCBag().add( msg ); #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] //Note: the reference for this (X) message should have already been inserted into the // NCBagReference by the ParallelProcessor:executeProcess() //let's check the size of the NCBagReference here MASSERTMSG( getNCBag().size() == (int)getBagRef().size(), "NC::receive(X) -> NCBag.size != BagRef.size, Processor does not insert reference?!" ); #endif //end JACKY_NCBAG_POINTERS [2006-02-03] #ifdef JACKY_X_SKIP_SAVING // Nov. 23, 2005 ############################################################### skipStateSaving = true; //this causes the TimeWarp to skip state saving after processing this event #ifdef JACKY_DEBUG //------------------------------------------------------------------ ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "##### NC::skipStateSaving = TRUE #####" << endl << flush; #endif //end JACKY_DEBUG -------------------------------------------------------------- #endif //end JACKY_X_SKIP_SAVING ########################################################################## #ifdef JACKY_DEBUG jacky_os << "\t[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " puts (X) msg in NCBag / time = " << (msg->time()).asString() << " / senderId = " << msg->procId() << " / port = " << (msg->port()).name() << " / value = " << ( (RealMsgValue*)(msg->value()) )->asString() << " / senderModelId = " << msg->senderModelId() << endl << flush; jacky_os << "$$$$[" << ParallelMainSimulator::Instance().getMachineID() << "] NC " << this->id() << " " << "BagTime=(" << getNCBag().time().asString() << ")" << endl << flush; jacky_os << "Current NCBag is:" << endl << flush; jacky_os << getNCBag() << endl << flush; #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] jacky_os << "2006-02-03 Current NCBagReference is:" << endl << flush; printNCBagRef( jacky_os ); jacky_os << endl << flush; #endif //end JACKY_NCBAG_POINTERS [2006-02-03] #endif /* ** [2006-04-03] Jacky Note: the following description is exactly the case in Multiple-Straggler ** situation with pending Time Slices! ** ** Jacky Note: Nov. 5, 2005 ** In some situations, we have to send out the (X) messages in the bag followed by a (*) message ** even when we are not dormant! ** For example, after rolling back to time 1000, the inputQ on machine 0 has the following events ** that have not yet been processed: ** eventId processed? ** ... ** [277] FC0 -> NC0 @ 1000 (1) //the last finished event after the rollback of NC0 ** [ 68] NC1 -> NC0 @ 1000 (0) ** [ 70] NC1 -> NC0 @ 1000 (0) ** [ 79] NC1 -> NC0 @ 2000 (0) ** [ 81] NC1 -> NC0 @ 2000 (0) ** [107] NC0 @ 1000 -> FC0 @ 2000 (0) //this is the event that has been unprocessed during the rollback ** Now, since the inputQ is ordered as above, the events will be processed in this order. As a result, ** event [68] & [70] are put into the NCBag, then event [79] & [81] are put into the bag as well (Hence, ** the local time of NC0 is advanced to 2000, the recvTime of event [79] & [81]), and finally, event [107] ** is executed, which will advance the simulation time on FC0 and all other simulators on machine0 to 2000. ** This can cause a big problem: when all simulators on machine0 have finished their execution, a (D) ** message will be sent from FC0 to NC0 with a recvTime of 2000. Then, NC0 finds that the MIN time is 1000, ** which is the time of the first two messages in the NCBag (for event [68] & [70]). Therefore, NC0 will ** try to send these two (X) messages followed by a (*) message to FC0. However, this will cause FC0 and all ** processors being rolled back to time 000! In our case, NC0 tries to insert event ** [155] NC0 @ 2000 -> FC0 @ 1000 //the 1st (X) message in the NCBag corresponding to event [68] ** into the inputQ. After this round of unnecessary rollback, this straggler message is actually inserted ** and an extra (and wrong) state is saved into the stateQ for NC0. The result of this rollback will result ** in an exception to be thrown later on. */ //now, check whether there are no more unprocessed events on the inputQ. If so, and we have //(X) msgs in the NCBag, we will send them out and reactivate the simulation! /*Jacky Note: ** To avoid unnecessary rollbacks, we need to send out (X) messages in the NCMessageBag followed by ** a (*) message just after processing all unprocessed messages in the inputQ with the smallest time ** stamp! ** For example: the NC is now dormant, and the inputQ has the following unprocessed messages: ** [1]NC1@1000=>this@1000, [2]NC1@1000=>this@1000, [3]NC2@1000=>this@1000, [4]NC1@2000=>this@2000 ... ** We should process message [1], [2], and [3] (inserting them into our NCMessageBag). Then, we need to ** send these 3 (X) messages to the FC followed by a (*) message with time 1000 BEFORE we process ** message [4]! ** i.e. we send (X) & (*) messages to the FC when we detect that there is a time change (for example, ** from 1000 to 2000) between the time of the (X) messages already in the NCMessageBag and the time of ** the next message in the inputQ that has not yet been processed (inserted into our NCMessageBag) ** Oct. 29, 2005 */ if( isDormant() //this NC is dormant && (getNCBag().size() > 0) //bag has (X) messages in it && ( (getNCBag().time() < stopTime()) || (getNCBag().time() == stopTime()) ) //(X) messgaes in the bag has time <= the stop time && (!hasUnprocessedEventWithSmallestTimeBeforeStop()) //all events with the smallest time processed ){ #ifdef JACKY_DEBUG jacky_os << "[CHECK!!!] NC DORMANT --> Send (X) & (*) messages" << endl << flush; #endif //reset the flag to FALSE & set lastChange and nextChange for the (*) msg isDormant(false); //send out (X) messages in the NCBag followed by a (*) message sortMessagesInBag(); #ifdef JACKY_X_SKIP_SAVING // Nov. 23, 2005 //Note: we have set skipStateSaving = true at the beginning of this function. If the NC is in // Dormant state, we will send out (X) messages in the NCBag followed by a (*) message. // In this case, we have to save the current state!!! skipStateSaving = false; //reset the flag to FALSE if we send out any message #ifdef JACKY_DEBUG jacky_os << "##### NC::Dormant reset skipStateSaving = FALSE #####" << endl << flush; #endif //end JACKY_DEBUG #endif //end JACKY_X_SKIP_SAVING } //end if return *this; } #ifdef JACKY_DEBUG /******************************************************************* * Function Name: showEvents() * Description: output the content of the EventList to the debug file ********************************************************************/ void ParallelNodeCoordinator::showEvents(EventList &events, ostream &out){ //let's show the external events Nov. 10, 2005 out << "eeee------------------- EventList in NC: -------------------------------eeee" << endl << flush; if( events.size() == 0 ){ out << "\tEventList is EMPTY!" << endl << flush; } else { EventList::const_iterator event_it = events.begin(); int eventIndex = 0; for(; event_it != events.end(); event_it++){ out << "[" << eventIndex << "]" << flush; if( event_it == currentEvent() ){ out << "<*>" << flush; } out << event_it->asString() << endl << flush; eventIndex++; } if( currentEvent() != events.end() ){ out << "---- eventsCursor = " << currentEvent()->asString() << " ----" << endl << flush; } else { out << "---- eventsCursor = END ----" << endl << flush; } } out << "eeee------------------------------------------------------------------eeee" << endl << flush; } #endif //end JACKY_DEBUG #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] ---------------------- //function to print out the NCBagReference void ParallelNodeCoordinator::printNCBagRef( ostream &out ) { if( getBagRef().size() > 0 ){ ParallelNodeCoordinator::NCBagReference::const_iterator pos = getBagRef().begin(); for( ; pos != getBagRef().end(); pos++ ){ //print out the recvTime of the BasicEvent, and its address out << "<" << (pos->first).asString() << ", " << pos->second << "> " << flush; } out << endl << flush; } else { out << "-> EMPTY!" << endl << flush; } } #endif //end JACKY_NCBAG_POINTERS [2006-02-03] ------------------ #ifdef JACKY_RB_EVENT //eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee //private function for sending all external events with minTime in the EventList void ParallelNodeCoordinator::sendMinTimeEventsToFC( const VTime &minTime ){ #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "1> before: CHECK THE CURRENT EVENT LIST --> " << endl << flush; showEvents(events(), jacky_os); #endif //send out all the external events with the given MIN time while ( !endOfEvents() && (minTime == (currentEvent()->time)) ) { Event ev = *(currentEvent()); //current event //Note: we should only send out external events that will influence a local simulator! // If an external event has nothing to do with a local simulator, we will ignore // it (it will be handled by other NCs) if( isSendingToLocalFC(*ev.port) ){ //this event will be received by a local simulator ExternalMessage extMsg( ev.time, id(), *ev.port, ev.value.value() ); extMsg.senderModelId( model().id() ); #ifdef JACKY_EVENT_JUMP //~~~~~~~~~~~~~~ Nov. 22, 2005 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ extMsg.isEvent(true); #endif // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #ifdef JACKY_DEBUG jacky_os << "\tNC " << this->id() << " : sends EXTERNAL EVENT / eventTime = " << extMsg.time() << " / port = " << extMsg.port().name() << "@" << extMsg.port().model().asString() << " / value = " << extMsg.value() << endl << flush; #endif //Note: since the external events are known in advance, this send() will // NOT cause a rollback on the receiving FC send( extMsg, localFCId ); } //end if eventsMoveNext(); } //end while #ifdef JACKY_DEBUG jacky_os << "2> after: CHECK THE CURRENT EVENT LIST --> " << endl << flush; showEvents(events(), jacky_os); #endif } //private function for sending all (X) messages with minTime in the NCBag //return the number of minTime (X) messages in the NCBag unsigned int ParallelNodeCoordinator::sendMinTimeNCBagMsgsToFC( const VTime &minTime ){ unsigned int numOfMinTimeXmsgs = 0; //send out all (X) msgs in the NCMessageBag with this MIN time if( (getNCBag().size() > 0) && (minTime == getNCBag().time()) ){ //get these (X) msgs MessageBag::MessageList xMsgList = getNCBag().getMessageWithMinTime(); numOfMinTimeXmsgs = xMsgList.size(); #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "$$$$ NC " << this->id() << " NCBag has {" << numOfMinTimeXmsgs << "} messages with MINtime (" << minTime.asString() << ")" << endl << flush; jacky_os << "$$$$ current NCBag is:" << endl << flush; jacky_os << getNCBag() << endl << flush; #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] jacky_os << "$$$$ 2006-02-03 Current NCBagReference is:" << endl << flush; printNCBagRef( jacky_os ); jacky_os << endl << flush; #endif //end JACKY_NCBAG_POINTERS [2006-02-03] #endif MessageBag::MessageList::const_iterator xMsgCursor = xMsgList.begin(); for( ; ( xMsgCursor != xMsgList.end() ) && ( minTime == getNCBag().time() ); //ensure the pointers in xMsgList are still valid xMsgCursor++ ) { //for each (X) msg in the NCMsgBag with the same MIN time, sends it to the FC /*Note: ** if send(extMsg, localFCId) triggers a rollback on the FC and the resulting ** secondary rollbacks happen on this NC, the NCMessageBag may be cleaned by ** function rollbackProcessorVariables(), and thus the pointers in the xMsgList ** are deleted (invalidated)! Therefore, we have to check this here!!! ** Oct. 26, 2005 */ const BasicMsgValue* pmsgVal = (*xMsgCursor)->value(); ExternalMessage extMsg( (*xMsgCursor)->time(), id(), (*xMsgCursor)->port(), ((RealMsgValue*)(pmsgVal))->v ); const BasicExternalMessage* xmsgPtr = dynamic_cast(*xMsgCursor); extMsg.senderModelId( xmsgPtr->senderModelId() ); #ifdef JACKY_EVENT_JUMP //~~~~~~~~~~~~~~ Nov. 22, 2005 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ extMsg.isEvent( (*xMsgCursor)->isEvent() ); #endif // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #ifdef JACKY_DEBUG jacky_os << "\tNC " << this->id() << " sends out (X) msg in NCBag / time = " << ((*xMsgCursor)->time()).asString() << " / senderId = " << id() << " / port = " << ((*xMsgCursor)->port()).name() << " / value = " << ((RealMsgValue*)(pmsgVal))->asString() << " / senderModelId = " << xmsgPtr->senderModelId() << endl << flush; #endif //Note: IMPORTANT!!! This send() is going to forward (X) from other NCs to the // local FC, and this may result in a rollback on the local FC!!! // After the possible rollback, the state of this NC may be restored to // a previously stored state! Therefore, we cannot take (sendMsgType() == // ParallelNodeCoordinatorState::CollectMsg) as granted!!! //Oct. 19, 2005 Jacky send( extMsg, localFCId ); } //end for } //end processing (X) msgs in the NCBag return numOfMinTimeXmsgs; } //private function to check whether there is a @ message with recvTime > the given minTime that has been //unprocessed on the current inputQ. This fucntion is used by receive(DoneMessage) to handle the situation //of multiple straggler messages after rollbacks //return the unprocessed @/* message or NULL if there is no such message BasicEvent* ParallelNodeCoordinator::findUnprocessedCollectORInternalMessageAfter( const VTime& minTime ){ BasicEvent* tempMsg = NULL; int num = 0; //counter BasicEvent* eventCursor = (BasicEvent*)(BasicTimeWarp::inputQ.getHead()); //go through the inputQ TWMessage* nextToExecuteMsg; Message* nextToExecuteCDMsg; while(eventCursor != NULL){ if( (eventCursor->alreadyProcessed == false) && (minTime < eventCursor->recvTime) ){ //unprocessed and after the minTime nextToExecuteMsg = (TWMessage*)eventCursor; nextToExecuteCDMsg = nextToExecuteMsg->getMessage(); if( (nextToExecuteCDMsg->type() == "@") || (nextToExecuteCDMsg->type() == "*") ) {// @ or * msg tempMsg = eventCursor; //save the last occurance of such event num++; #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); if(nextToExecuteCDMsg->type() == "@"){ jacky_os << endl << "\t[-=-=OOP-=-=]-> unprocessed @ message found [" << flush; } else { jacky_os << endl << "\t[-=-=OOP-=-=]-> unprocessed * message found [" << flush; } jacky_os << num << "] <" << " id=" << tempMsg->eventId << " sign=" << tempMsg->sign << " " << tempMsg->sender << "@" << tempMsg->sendTime << " =" << nextToExecuteCDMsg->type() << "=> " << tempMsg->dest << "@" << tempMsg->recvTime << " alreadyProcessed?" << tempMsg->alreadyProcessed << " > tempADD = " << tempMsg << " cursorADD = " << eventCursor << endl << flush; #endif } //end if type = @/* #ifdef JACKY_DESTROY_MESSAGE delete nextToExecuteCDMsg; #endif } //end if unprocessed & recvTime > minTime eventCursor = eventCursor->next; } //end while //there should be only ONE such event MASSERTMSG( num == 1, "NC::findUnprocessedCollectORInternalMessageAfter() -> number of futureCIEvent is not 1 !" ); return tempMsg; } //function to find all messages that have been sent before saving a certain state list< Container* > ParallelNodeCoordinator::findAllOutputMessagesForState( const BasicState* thisState){ list< Container* > allMessages; //1> save the outputPos in thisState Container* endPoint = thisState->outputPos; //2> get the 1st Container that contains the 1st message sent out before saving thisState // (i.e. the Container just AFTER the outputPos in the previous state) Container* startPoint = thisState->prev->outputPos->next; //3> copy pointers to Containers in the range of [startpoint, endPoint) in the NC's outputQ Container* cursor = startPoint; while( cursor != endPoint ){ allMessages.push_back( cursor ); cursor = cursor->next; //move to the next Container } //4> now, push the endPoint into the list allMessages.push_back( endPoint); return allMessages; } //function to find those events that are derived from external events from a list of pointers to Containers list< BasicEvent* > ParallelNodeCoordinator::findMessagesDerivedFromExternalEventsFromContainers(const list< Container* > &messages) { list< BasicEvent* > externalEventMessages; list< Container* >::const_iterator con_it = messages.begin(); for( ; con_it != messages.end(); con_it++ ){ //get the BasicEvent* BasicEvent* event = (*con_it)->object; TWMessage* eventMsg = (TWMessage*)event; if( eventMsg->derivedFromExternalEvent() == true){ //this is a message derived from external event externalEventMessages.push_back(event); } } #ifdef JACKY_DEBUG VTime eventTime; if( externalEventMessages.size() > 0 ) { eventTime = externalEventMessages.front()->recvTime; list< BasicEvent* >::const_iterator ev_it = externalEventMessages.begin(); for( ; ev_it != externalEventMessages.end(); ev_it++ ){ MASSERTMSG( (eventTime == (*ev_it)->recvTime), "NC::findMessagesDerivedFromExternalEventsFromContainers() -> event time NOT EQUAL!" ); } } #endif return externalEventMessages; } #ifdef JACKY_NC_GET_OUTPUT_MSG_FROM_futureCIEvent //[2006-02-13] ======================================================= list< BasicEvent* > ParallelNodeCoordinator::findMessagesDerivedFromExternalEventsFromBasicEvents( const list< BasicEvent* > &messages ){ list< BasicEvent* > externalEventMessages; list< BasicEvent* >::const_iterator con_it = messages.begin(); for( ; con_it != messages.end(); con_it++ ){ BasicEvent* event = (*con_it); TWMessage* eventMsg = (TWMessage*)(event); if( eventMsg->derivedFromExternalEvent() == true){ //this is a message derived from external event externalEventMessages.push_back(event); } } #ifdef JACKY_DEBUG VTime eventTime; if( externalEventMessages.size() > 0 ) { eventTime = externalEventMessages.front()->recvTime; list< BasicEvent* >::const_iterator ev_it = externalEventMessages.begin(); for( ; ev_it != externalEventMessages.end(); ev_it++ ){ MASSERTMSG( (eventTime == (*ev_it)->recvTime), "NC::findMessagesDerivedFromExternalEventsFromContainers() -> event time NOT EQUAL!" ); } } #endif return externalEventMessages; } //function to find all unprocessed event that were sent to the FC along with the futureCIEvent on the inputQ //this function searches the current inputQ to find all unprocessed events that have the following attributes: //1. event->recvTime = futureCIEvent->recvTime //2. event->sendTime = futureCIEvent->sendTime //3. event->alreadyProcessed = false //4. event->sender = futureCIEvent->sender & event->dest = futureCIEvent->dest //Note: the returned list from this function includes the futureCIEvent! // //function findAllOutputMessagesForState() uses reference to stateFound->outputPos and the outputPos of the state //before the stateFound on the NC's stateQ to determine all future events (as objects in their containers), this //is not safe! Since the state before stateFound and the stateFound itself may be garbage collected when we find //them, the outputPos pointers in these 2 state may be NULL. Also, the state before stateFound may has already been //deleted from the NC's stateQ. Thus, we use this function to find all future events directly from the inputQ. list< BasicEvent* > ParallelNodeCoordinator::findAllFutureEvents( const BasicEvent* futureCIEvent ){ list< BasicEvent* > allFutureEvents; BasicEvent* eventCursor = (BasicEvent*)(BasicTimeWarp::inputQ.getTail()); //go through the inputQ backward while(eventCursor != NULL){ if( (eventCursor->alreadyProcessed == false) && //unprocessed (eventCursor->sendTime == futureCIEvent->sendTime) && //same sendTime as the futureCIEvent (eventCursor->recvTime == futureCIEvent->recvTime) && //same recvTime as the futureCIEvent (eventCursor->sender == futureCIEvent->sender) && //same sender (the NC) (eventCursor->dest == futureCIEvent->dest) //same receiver (the FC) ){ allFutureEvents.push_back( eventCursor ); } eventCursor = eventCursor->prev; } //end while return allFutureEvents; } //function to find all containers on the NC's outputQ that contain the BasicEvent* as the given futureEvents //search the NC's outputQ backward to find the addresses of the containers that contain the futureEvents //if the outputQ is empty, just return the empty allOutputMessages list< Container* > ParallelNodeCoordinator::findAllOutputMessagesForFutureEvents( const list< BasicEvent* >& futureEvents ){ list< Container* > allOutputMessages; //in any case, the number of the futureEvents should not be zero MASSERTMSG( futureEvents.size() != 0, "NC::findAllOutputMessagesForFutureEvents() -> input futureEvents.size = 0!" ); if( outputQ.size() != 0 ){ //NC's outputQ is not empty, search it backward Container* containerPos = outputQ.getTail(); //the tail should not be NULL for an non-empty outputQ MASSERTMSG( containerPos != NULL, "NC::findAllOutputMessagesForFutureEvents() -> NC outputQ size != 0 but tail == NULL" ); //an event cursor to iterate through all future events list::const_iterator eventCursor = futureEvents.begin(); for( ; eventCursor != futureEvents.end(); eventCursor++ ){ //go through the future events BasicEvent* currentFutureEvent = *eventCursor; //the current future event we are looking for while( containerPos != NULL ){ if( containerPos->object == currentFutureEvent ){ //the container for the current future event is found //1. insert the address the Container into the allOutputMessages allOutputMessages.push_back(containerPos); //2. break the while loop and search for the next future event from tail again break; } containerPos = containerPos->prev; } //end while [iterate over the NC's outputQ] //set containerPos to the Tail of the outputQ again containerPos = outputQ.getTail(); } //end for [iterate over the future events] //the number of the Containers found should be the number of the input futureEvents MASSERTMSG( allOutputMessages.size() == futureEvents.size(), "NC::findAllOutputMessagesForFutureEvents() -> NC allOutputMessages.size != futureEvents.size" ); } //end if outputQ.size() != 0 //if the NC's outputQ is empty since the containers have already been garbage collected //this will return the empty "allOutputMessages" return allOutputMessages; } #endif //end JACKY_NC_GET_OUTPUT_MSG_FROM_futureCIEvent [2006-02-13] ====================================================== //function to roll back the eventCursor of the EventList //eventCursor should be rolled back to the 1st event with the given "eventTime" void ParallelNodeCoordinator::rollbackExternalEvents( VTime &eventTime ){ #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "NC::rollbackExternalEvents() eventTime = " << eventTime.asString() << endl << flush; #endif if( events().size() > 0 ){ //int steps = 0; if( !endOfEvents() ) { #ifdef JACKY_DEBUG jacky_os << "\tcurrentEvent != END" << endl << flush; #endif while( ((eventTime < (*currentEvent()).time) || (eventTime == (*currentEvent()).time)) && (currentEvent() != events().begin()) ){ eventsMovePrev(); //move backward } #ifdef JACKY_DEBUG jacky_os << "\t--> move backward to LAST event with LESS time / BEGIN ..." << endl << flush; jacky_os << "\t\tresult = " << currentEvent()->asString() << endl << flush; #endif //now cursor is at the beginning of the list or the last element with time < eventTime if( (*currentEvent()).time < eventTime ){ #ifdef JACKY_DEBUG jacky_os << "\t--> current time < eventTime, before move forward..." << endl << flush; jacky_os << "\t\tresult = " << currentEvent()->asString() << endl << flush; #endif eventsMoveNext(); //move forward #ifdef JACKY_DEBUG jacky_os << "\t--> after move forward ..." << endl << flush; jacky_os << "\t\tresult = " << currentEvent()->asString() << endl << flush; #endif } else { //currentEvent.time >= eventTime if( eventTime < (*currentEvent()).time ) { MASSERTMSG( false, "NC::rollbackExternalEvents() -> ERROR! check event list" ); } if( eventTime == (*currentEvent()).time ){ //we must at the BEGIN MASSERTMSG( currentEvent() == events().begin(), "NC::rollbackExternalEvents()->ERROR! currentEvent() should be events().begin()!"); } } } else { //currentEvent() has already beyond the END of the EventList #ifdef JACKY_DEBUG jacky_os << "\tcurrentEvent = END!" << endl << flush; #endif eventsMovePrev(); //move to the last event #ifdef JACKY_DEBUG jacky_os << "\t--> move to the last event ..." << endl << flush; jacky_os << "\t\tresult = " << currentEvent()->asString() << endl << flush; #endif while( ((eventTime < (*currentEvent()).time) || (eventTime == (*currentEvent()).time)) && (currentEvent() != events().begin()) ){ eventsMovePrev(); //move backward } #ifdef JACKY_DEBUG jacky_os << "\t--> move backward to LAST event with LESS time / BEGIN ..." << endl << flush; jacky_os << "\t\tresult = " << currentEvent()->asString() << endl << flush; #endif //now cursor is at the beginning of the list or the last element with time < eventTime if( (*currentEvent()).time < eventTime ){ #ifdef JACKY_DEBUG jacky_os << "\t--> current time < eventTime, before move forward..." << endl << flush; jacky_os << "\t\tresult = " << currentEvent()->asString() << endl << flush; #endif eventsMoveNext(); //move forward #ifdef JACKY_DEBUG jacky_os << "\t--> after move forward ..." << endl < ERROR! check event list" ); } if( eventTime == (*currentEvent()).time ){ //we must at the BEGIN MASSERTMSG( currentEvent() == events().begin(), "NC::rollbackExternalEvents()->ERROR! currentEvent() should be events().begin()!"); } } } } else { MASSERTMSG( false, "NC::rollbackExternalEvents() -> EventList is EMPTY! This function should not be called." ); } } #endif //eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee //[2006-03-26] //Jacky Note: Since the NC will use InfreqStateManager as a normal StateManager and will NOT do coast forward //operations during rollback, this function should do nothing! /* #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] //Jacky Note: this function is to recover the variables defined in processors // such as the NCMessageBag before the coast forward operations inline void ParallelNodeCoordinator::recoverProcessorVariables( ){ //we need to get the BasicEvent* from the bagref in the current state, and add (X) Message objects //into the NCMessageBag based on info. extracted from these BasicEvents #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "[InfreqStateManager] NC::recoverProcessorVariables( ) called *************" << endl << flush; jacky_os << "\tcurrent NCBag is: " << endl << flush; jacky_os << getNCBag() << endl << flush; jacky_os << "--------------------------------------------------------------------------------" << endl << flush; jacky_os << "\tNCBagRef in the currentState is: " << endl << flush; printNCBagRef( jacky_os ); #endif //Note: rollbackProcessorVariables(rollbackTime) will rollback the NCBag to the status just before rollbackTime //i.e. it's the status at the end of the time slice before TS //This status may not be the same as the status that should be recovered from the BagRef during coast forward //we need to empty the NCBag() first! if( getNCBag().size() > 0 ){ getNCBag().eraseAll(); #ifdef JACKY_DEBUG jacky_os << endl << "[InfreqStateManager] NCBag cleaned!!! ************************************" << endl << flush; #endif } //then, we recover the NCBag based solely on the BagRef restored from the state! //iterate over the NCBagRef, create BasicExternalMessage object and add them into NCBag if( getBagRef().size() > 0 ){ ParallelNodeCoordinatorState::NCBagReference::const_iterator pos = getBagRef().begin(); BasicEvent* externalMessagePtr; for( ; pos != getBagRef().end(); pos++ ){ externalMessagePtr = pos->second; //get the BasicEvent* TWMessage* extMsg = (TWMessage*)externalMessagePtr; //cast to TWMessage //create a new BasicExternalMessage object BasicExternalMessage *xmsg = ((TWExternalMessage *)extMsg)->getMessage(); //add this BasicExternalMessage object into the NCBag getNCBag().add(xmsg); } //end for }//end if #ifdef JACKY_DEBUG jacky_os << endl << "[InfreqStateManager] NCBag is recovered from NCBagRef *************" << endl << flush; jacky_os << "\tcurrent NCBag is: " << endl << flush; jacky_os << getNCBag() << endl << flush; jacky_os << "--------------------------------------------------------------------------------" << endl << flush; jacky_os << "\tNCBagRef in the currentState is: " << endl << flush; printNCBagRef( jacky_os ); #endif //At the end of the day, the NCBag & BagRef should contain the same number of elements MASSERTMSG( getNCBag().size() == (int)getBagRef().size(), "NC::recoverProcessorVariables -> at the end, NCBag.size != BagRef.size" ); } #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER [2006-03-15] */