/******************************************************************* * Declaration: This class is partially based on Glinsky's version that * can be found in the "oldsource" directory. * * DESCRIPTION: class ParallelFlatCoordinator * * AUTHOR: Qi (Jacky) Liu * * EMAIL: mailto://liuqi@sce.carleton.ca * * DATE: Augest, 2005 * *******************************************************************/ #include "pfcoord.h" // head file #include "pcoordin.h" // definition of ProcSet #include "pFCoordState.cpp" #include "evaldeb.h" // ShowVirtualTimeWhenFinish #include "message.h" // class Message #include "pmessage.h" #include "pmodeladm.h" // class SingleParallelModelAdm #include "coupled.h" // class Coupled #include "parsimu.h" // class ParallelMainSimulator #include "log.h" // class Log /*private functions*/ /******************************************************************* * Function Name: private constructor * Description: the ParallelProcessorAdmin is a friend class of this ParallelFlatCoordinator, * this private constructor is only used by the ParallelProcessorAdmin to generate * a FC ********************************************************************/ ParallelFlatCoordinator::ParallelFlatCoordinator( FlatCoordinatorModel* ncmdl) : ParallelProcessor( ncmdl ) { // parentNCId is the procId of the local NC, and it is set in parsimu.cpp when the FC and NC are created parentNCId = ParallelMainSimulator::Instance().nodeCoordinatorsList[ParallelMainSimulator::Instance().getMachineID()] ; } /******************************************************************* * Function Name: addLocalDependants() * Description: add LOCAL MASTER ParallelSimulators for Atomic models on the current machine * as the dependants of this FC ********************************************************************/ ParallelFlatCoordinator &ParallelFlatCoordinator::addLocalDependants() { const ParallelModelAdmin::ModelDB& models( SingleParallelModelAdm::Instance().models()); for (ParallelModelAdmin::ModelDB::const_iterator cursor = models.begin() ; cursor != models.end(); cursor++) { Model* model = cursor->second; for(ModelPartition::const_iterator mcursor = model->modelPartition().begin(); mcursor != model->modelPartition().end(); mcursor++) { // If this is a local master processor, add it to the list of dependants if ( (mcursor->second == model->localProc()) && (mcursor->second == model->masterId()) ) { // If this is an atomic model (actually, atomic or cell), add it as a dependant of this FlatCoordinator if ( model->isAtomic() ) { //cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] Model: " // << model->description() << "(" << model->className() << ") machine: " // << mcursor->first << " procId: " << mcursor->second << endl; dependants()[model->localProc()] = VTime::Inf; } //else { //cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] (NOT ADDED) Model: " // << model->description() << "(" << model->className() << ") machine: " << mcursor->first // << " procId: " << mcursor->second << endl; //} } } } return *this; } /******************************************************************* * Function Name: findLocalInfluenceList() * Description: find all receiving ports on local atomic models whose associated simulators are dependants of this FC, * insert these ports into the "localInfluList" ********************************************************************/ ParallelFlatCoordinator& ParallelFlatCoordinator::findLocalInfluenceList(const Port& destPort){ InfluenceList influList = destPort.influences(); //the original influenceList for the specified port InfluenceList::const_iterator cursor = influList.begin(); for(; cursor != influList.end(); cursor++){ //iterate over all ports that will receive msg from the specified port if(((*cursor)->model()).isAtomic()) { //if the receiving port is on an atomic model //get the procId of the corresponding simulator ProcId simulatorProcId = ((*cursor)->model()).localProc(); //check whether this simulator is a dependant of this FC if(dependants().find(simulatorProcId) != dependants().end()){ //the simualtor is a dependant of this FC, add the receiving port into the "localInfluList" //localInfluList.push_back((*cursor)); getLocalInfluList().push_back((*cursor)); // new version Oct. 19, 2005 } } else { //the receiving port is on a coupled model //we need to search recersively until an atomic receiving model is found findLocalInfluenceList(*(*cursor)); } } return *this; } /******************************************************************* * Function Name: isSendingToEnvironment() * Description: test whether the specified port will eventually connect to an output port of the TOP model ********************************************************************/ bool ParallelFlatCoordinator::isSendingToEnvironment(const Port& destPort){ //we need to recersively trace the influenceList of the dest port to see whether an output port of the //TOP model is the destination bool result = false; const InfluenceList &influList( destPort.influences() ); //all ports that will receive the Y msg InfluenceList::const_iterator cursor( influList.begin() ); //Note: the receiving port may belong to an atomic model (we don't care this case) or a coupled model //if the receiving port belongs to a coupled model, the coupled model should be a descendant of the TOP model //we go up the hierarchical structure until the port has an empty influenceList, then if the port is one of the //output ports of the TOP model, return true; otherwise, return false for(; cursor != influList.end(); cursor++) { if( !((*cursor)->model()).isAtomic()) { //receiving port belongs to a coupled model if( ((*cursor)->model()).description() == "top" ){ //receiving port belongs to the TOP model result = true; break; //break the for loop } else { result = isSendingToEnvironment(*(*cursor)); if(result) { break; //break the for loop } } } } return result; } /******************************************************************* * Function Name: isSendingToRemoteSimulator() * Description: test whether the specified port will eventually connect to remote simulators on other machines ********************************************************************/ bool ParallelFlatCoordinator::isSendingToRemoteSimulator(const Port& destPort){ //we need to recersively trace the influenceList of the dest port until atomic models (simulators) are reached //to see whether the destination simulator is not on this machine. Whenever a remote receiving simulator is //found, break the loop and return true. bool result = false; const InfluenceList &influList( destPort.influences() ); //all ports that will receive the Y msg InfluenceList::const_iterator cursor( influList.begin() ); for(; cursor != influList.end(); cursor++) { if(((*cursor)->model()).isAtomic()) { //if the receiving port is on an atomic model //get the procId of the corresponding simulator ProcId simulatorProcId = ((*cursor)->model()).localProc(); //check whether this simulator is a dependant of this FC if(dependants().find(simulatorProcId) == dependants().end()){ //one remote receiving simulator found result = true; break; //break the for loop } } else { //the receiving port is on a coupled model //we need to search recersively until an atomic receiving model is found result = isSendingToRemoteSimulator(*(*cursor)); if(result) { break; //break the for loop } } } return result; } #ifdef JACKY_DEBUG /******************************************************************* * Function Name: printLocalDependants() * Description: print out the local dependants of this FC for debugging purpose * The FC's dependants are setup in initialize() function ********************************************************************/ void ParallelFlatCoordinator::printLocalDependants(ostream& out){ ParallelFlatCoordinatorState::DependantList& dependants = this->dependants(); ParallelFlatCoordinatorState::DependantList::const_iterator cursor; out << "-------------------Local Dependants of the FC : ------------------------------------" << endl; for(cursor = dependants.begin(); cursor != dependants.end(); cursor++){ ProcId depId = cursor->first; VTime localTime = cursor->second; out << "dependant procId = " << depId << " with localTime @" << localTime.asString() << endl; } out << "---------------------------------------------------------------------------------------" << endl; } #endif //end JACKY_DEBUG /*protected functions*/ /******************************************************************* * Function Name: calculateNextChange * Description: calculate the MIN nextChange() time among dependants ********************************************************************/ VTime ParallelFlatCoordinator::calculateNextChange(const VTime& time) const { VTime next(VTime::Inf); ParallelFlatCoordinatorState::DependantList::const_iterator cursor; //find the MIN absoluteNext() time among all the local dependants for( cursor = dependants().begin() ; cursor != dependants().end() ; cursor++ ) if( cursor->second < next) next = cursor->second; next -= time; //subtract the current time (msgTime) from this MIN time to get the MIN nextChange() time return next; } /******************************************************************* * Function Name: sortExternalMessage() * Description: analyze the X msg in the bag. Used by receive(const InternalMessage &) ********************************************************************/ ParallelFlatCoordinator &ParallelFlatCoordinator::sortExternalMessage( const BasicExternalMessage &msg ) { MASSERTMSG( getLocalInfluList().size() == 0, "FC::sortExternalMessage() -> Garbage in FC::getLocalInfluList()!" ); #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "*** FC entering sortExternalMessage(Xmsg) --> getSynchronizeList().size()=" << getSynchronizeList().size() << "***" << endl << flush; #endif //*** populate the localInfluList *** findLocalInfluenceList( msg.port() ) ; MASSERTMSG( getLocalInfluList().size() > 0, "FC::sortExternalMessage() -> Dummy (X) msg received!" ); InfluenceList::const_iterator cursor( getLocalInfluList().begin() ) ; BasicExternalMessage xMsg( msg ) ; xMsg.procId( id() ) ; //if there is a local receiving port, send the X msg to this port, and add the procId of the //receiving simulator to the synchronizeList; otherwise, send out no msgs! for( ; cursor != getLocalInfluList().end(); cursor++ ) { getSynchronizeList().insert( (*cursor)->model().localProc()); #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "*** \tFC sortExternalMessage(Xmsg) --> getSynchronizeList() insert procId: " << (*cursor)->model().localProc() << "***" << endl << flush; #endif xMsg.port( *(*cursor) ) ; send( xMsg,(*cursor)->model().localProc() ) ; //send the X msg to the local receiving port }//end for //*** clear the localInfluList *** getLocalInfluList().clear(); #ifdef JACKY_DEBUG jacky_os << "*** FC leaving sortExternalMessage(Xmsg) --> getSynchronizeList().size()=" << getSynchronizeList().size() << "***" << endl << flush; #endif return *this ; } /******************************************************************* * Function Name: sortOutputMessage() * Description: analyze the Y msgs. Used by receive(const BasicOutputMessage *) * Note: There are 3 different cases: * 1. the Y msg will be received by a local simulator * (local simulator A -(Y)-> FC -(X)-> local simulator B) * -> translate the Y msg into an X msg and forward it to the local destination simulator * 2. the Y msg should be output to the environment * (local simulator -(Y)-> FC -(Y)-> NC -(Y)-> Root -> environment) * -> forward the Y msg directly to the NC (parentNCId) * 3. the Y msg will be received by a remote simulator * (local simulator A -(Y)-> FC -(Y)-> NC -(X)-> remote NC -(X)-> remote FC -(X)-> remote simulator B) * -> forward the Y msg directly to the NC (parentNCId) * All these cases may happen simultaneously for one Y msg. * Important: In case 2 and case 3, totally only ONE Y msg should be forwarded to the NC. The NC will take care of * the relay of the msg to the Root or remote NCs! *******************************************************************/ ParallelFlatCoordinator &ParallelFlatCoordinator::sortOutputMessage( const BasicOutputMessage &msg ) { MASSERTMSG( getLocalInfluList().size() == 0, "sortOutputMessage() -> Garbage in FC::getLocalInfluList()!" ); //cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] sortOutputMessage at time " // << msg.time() << endl; bool hasSentToNC = false; //make sure there is only ONE Y msg will be sent to the NC const Port& destPort = msg.port(); BasicOutputMessage outMsg( msg ); //Y msg to be forwarded to the NC in case 2 & 3 outMsg.procId( id() ); BasicExternalMessage extMsg; //X msg to be forwarded to local simulators in case 1 extMsg.time( msg.time() ); extMsg.procId( id() ); extMsg.value( msg.value()->clone() ); extMsg.senderModelId( msg.port().modelId() ); //1> check whether the Y msg should be sent to the environment if( isSendingToEnvironment(destPort) ){ //send the Y msg to the NC without translating the port, the dest port will be set by the NC hasSentToNC = true; send( outMsg, parentNCId ); } //2> check whether the Y msg should be sent to remote simulator if( !hasSentToNC && isSendingToRemoteSimulator(destPort) ){ //send the Y msg to the NC without translating the port, the dest port will be set by the NC send( outMsg, parentNCId ); } //3> get the local ports that should receive the X msg //*** populate the localInfluList *** findLocalInfluenceList(destPort); //now "localInfluList" is loaded with ports on local atomic models InfluenceList::const_iterator cursor( getLocalInfluList().begin() ) ; //if there is a local receiving port, send the X msg to this port, and add the procId of the //receiving simulator to the synchronizeList; otherwise, send out no msgs! for( ; cursor != getLocalInfluList().end(); cursor++ ) { getSynchronizeList().insert( ((*cursor)->model()).localProc() ); extMsg.port( *(*cursor) ) ; send( extMsg,(*cursor)->model().localProc() ) ; //send the X msg to the local receiving port }//end for //*** clear the localInfluList *** getLocalInfluList().clear(); return *this; } //rollbackCheck() is commented out since all local defined variables in the FC have been //moved into the state. Oct. 19, 2005 /******************************************************************* * Function Name: rollbackCheck() * Description: Checks if a rollback has taken place and cleans local variables. * This is a virtual function defined in ParallelProcessor, we override it * here to clean our specific local variables * Note: this function is called in ParallelProcessor::executeProcess() ********************************************************************/ /* bool ParallelFlatCoordinator::rollbackCheck(const VTime& currentTime){ //1> call base class' version to clean the current MessageBag bool rollback = ParallelProcessor::rollbackCheck( currentTime ); //2> if rollback happened, clean other local variables if ( rollback ) { doneCount ( 0 ); getSynchronizeList().erase( getSynchronizeList().begin(), getSynchronizeList().end() ); getLocalInfluList().clear(); } return rollback; } */ /*public functions*/ /******************************************************************* * Function Name: initialize() * Description: Required by TimeWarp. * This function initializes doneCount, the lastChange() and * nextChange() of this FC, setup the local dependants for * the FC, clear the synchronizeList and localInfluList ********************************************************************/ void ParallelFlatCoordinator::initialize() { ParallelProcessor::initialize(); //call base class's initialize() doneCount(0); lastChange( VTime::Zero ); nextChange( VTime::Inf ); addLocalDependants(); //setup the local dependants (simulators) for this FC #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "------------------ FC initialize() -------------------" << endl << flush; printLocalDependants(jacky_os); #endif getSynchronizeList().clear(); getLocalInfluList().clear(); ParallelMainSimulator::Instance().debugStream() << "Initializing FlatCoordinator: Finished" << endl << flush; } /******************************************************************* * Function Name: allocateState() * Description: Required by TimeWarp. * This function create a new ParallelFlatCoordinatorState ********************************************************************/ BasicState* ParallelFlatCoordinator::allocateState(){ ParallelFlatCoordinatorState *state = new ParallelFlatCoordinatorState; state->dependants = new ParallelFlatCoordinatorState::DependantList; return state; } /*receive functions*/ /******************************************************************* * Function Name: receive(const InitMessage &) * Description: forwards the I msg to all local simulators, and sets doneCount ********************************************************************/ ParallelProcessor &ParallelFlatCoordinator::receive( const InitMessage &msg ) { //Send an init message to each of the local dependants. doneCount ( dependants().size() ); //doneCount = total number of local dependants ParallelFlatCoordinatorState::DependantList::const_iterator cursor; InitMessage init ( msg.time(), id() ); //Send to all local dependants for( cursor = dependants().begin() ; cursor != dependants().end() ; cursor++ ) { this->send( init , cursor->first ); } #ifdef JACKY_MSG_TYPE_BASED_STATE_SAVING //[2006-02-23] skipStateSaving = true; //this causes the TimeWarp to skip state saving after processing this event #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "##### FC::skipStateSaving = TRUE #####" << endl << flush; #endif //end JACKY_DEBUG #endif //end JACKY_MSG_TYPE_BASED_STATE_SAVING [2006-02-23] return *this; } /******************************************************************* * Function Name: receive(const DoneMessage &) * Description: decreases the doneCount, updates dependants' absoluteNext() time, * and sends a D msg to the NC ********************************************************************/ ParallelProcessor &ParallelFlatCoordinator::receive( const DoneMessage &msg ) { //There has to be (at least) one (D) msg pending MASSERTMSG( doneCount() > 0, "FC" + int2Str(id()) + ": Unexpected Done message!" ); //(D) msgs must be from Dependants (i.e., !isFromSlave()) MASSERTMSG( ( !msg.isFromSlave() ), "FC: (D) msg received from 'slave' instead of 'dependant'!" ); doneCount( doneCount() - 1); //Update the depdendant's absoluteNext() time dependants()[msg.procId()] = msg.time() + msg.nextChange(); //cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] FC receives (D) msg from procID: " // << msg.procId() << " at time=" << msg.time() << " / nextChange()=" << msg.nextChange() // << " / FC doneCount()=" << doneCount() << endl; if( doneCount() == 0 ) { lastChange ( msg.time() ); //new nextChange = MIN(all Local dependants' absoluteNext) - currentMsgTime nextChange( calculateNextChange( msg.time() ) ); DoneMessage doneMsg( msg.time(), id(), nextChange(), false ); //cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] FC " << id() // << " send (D) msg to NC " << parentNCId << "at time " << msg.time() // << " / new nextChange()=" << nextChange() << endl ; #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "[" << ParallelMainSimulator::Instance().getMachineID() << "] FC " << id() << " send (D) msg to NC " << parentNCId << "at time " << msg.time() << " / new nextChange()=" << nextChange() << endl << flush; #endif send( doneMsg, parentNCId ); } return *this; } /******************************************************************* * Function Name: receive(const InternalMessage &) * Description: this function processes the * msg received from the NC. * it calls sortExternalMessage() to send X msg in the FC's * MessageBag to local simulators, and sends * msg to those * local simulators that are either imminent or have just * received X msgs to trigger the Internal/External/Confluence * functions in the atomic model ********************************************************************/ ParallelProcessor &ParallelFlatCoordinator::receive( const InternalMessage &msg ) { MASSERTMSG( doneCount() == 0, "FC" + int2Str(id()) + ": Received a (*) msg and doneCount != 0!" ); //Nov. 10, 2005 //we should set the lastChange() to the recvTime of the (*) msg lastChange( msg.time() ); //the nextChange() should not be changed, it will be updated when a (D) msg is received //1> sort the (X) msg in the MessageBag #ifdef JACKY_DEBUG //let's check the MessageBag of the FC ostream& jacky_os = JackyDebugStream::Instance().Stream(); int msgInBagCount = 1; jacky_os << "**** InternalMsg received in FC, MessageBag.size() = " << externalMsgs.size() << "****" << endl << flush; jacky_os << "MessageBag is: " << externalMsgs << endl << flush; #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- //check the FCBagReference in the FC's state jacky_os << endl << "[InfreqStateManager] -> STATE::FCBagRef.size() = " << getFCBagRef().size() << endl << flush; jacky_os << "[InfreqStateManager] -> STATE::FCBagRef is: " << flush; printFCBagRef(jacky_os); jacky_os << endl << flush; #endif //[2006-03-15] ------------------------------------------------------------------------------------------- jacky_os << "***Before FC sortExternalMessage(), getSynchronizeList().size()=" << getSynchronizeList().size() << "***" << endl << flush; for(MessageBag::iterator extMsgs = externalMsgs.begin(); extMsgs != externalMsgs.end(); extMsgs++){ BasicExternalMessage* xMsg = (BasicExternalMessage*) (*extMsgs); string sendingModelName = SingleParallelModelAdm::Instance().model(xMsg->senderModelId()).description(); jacky_os << "*** " << msgInBagCount << ": " << xMsg->procId() << "@" << xMsg->time() << " sendingModel[" << sendingModelName << "] value=" << ((RealMsgValue*)(xMsg->value()))->v << " => " << (xMsg->port()).name() << "#" << (xMsg->port()).model().description() << endl << flush; } jacky_os << "****************************************************" << endl << flush; #endif for( MessageBag::iterator extMsgs = externalMsgs.begin(); extMsgs != externalMsgs.end(); extMsgs++ ){ sortExternalMessage( *((BasicExternalMessage*) (*extMsgs))); } //2> clear the MessageBag externalMsgs.eraseAll(); #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- //we also need to clear the FCBagRef getFCBagRef().clear(); #endif //[2006-03-15] ------------------------------------------------------------------------------------------- //3> send (*) msgs to all simulators whose procIds are in the synchronizeList ParallelFlatCoordinatorState::ProcSet::const_iterator cursor; InternalMessage internal( msg.time(), id() ) ; for( cursor = getSynchronizeList().begin(); cursor != getSynchronizeList().end() ; cursor++ ) { doneCount( doneCount() + 1); send( internal, *cursor ) ; } #ifdef JACKY_DEBUG jacky_os << "***After FC sortExternalMessage(), getSynchronizeList().size()=" << getSynchronizeList().size() << "***" << endl << flush; jacky_os << "*** procId in the synchronizeList are: "; for(cursor = getSynchronizeList().begin(); cursor != getSynchronizeList().end() ; cursor++){ jacky_os << *cursor << " "; } jacky_os << "***" << endl << flush; if(doneCount() != 0){ jacky_os << "**************** doneCount=" << doneCount() << "******************" << endl << flush; } #endif //4> clear the synchronizeList getSynchronizeList().erase( getSynchronizeList().begin(), getSynchronizeList().end()); //5> If doneCount == 0, strange! Let's record this #ifdef JACKY_DEBUG if ( doneCount() == 0 ) { jacky_os << "****** ERROR: FC after sending (*) msgs, doneCount = 0! ******" << endl << flush; } #endif //end JACKY_DEBUG #ifdef JACKY_MSG_TYPE_BASED_STATE_SAVING //[2006-02-23] skipStateSaving = true; //this causes the TimeWarp to skip state saving after processing this event #ifdef JACKY_DEBUG jacky_os << "##### FC::skipStateSaving = TRUE #####" << endl << flush; #endif //end JACKY_DEBUG #endif //end JACKY_MSG_TYPE_BASED_STATE_SAVING [2006-02-23] return *this; } /******************************************************************* * Function Name: receive(const CollectMessage&) * Description: this function processes the @ msg received from the NC. * it finds out the imminent local simulators, and forwards * the @ msg to them, sets the synchronizeList and doneCount * accordingly ********************************************************************/ ParallelProcessor &ParallelFlatCoordinator::receive( const CollectMessage &msg ) { #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "*** FC entering receive(@), getSynchronizeList().size()=" << getSynchronizeList().size() << "***" << endl << flush; ParallelFlatCoordinatorState::ProcSet::const_iterator syncursor; if(getSynchronizeList().size() != 0){ jacky_os << "*** procId in the getSynchronizeList() are: "; for(syncursor = getSynchronizeList().begin(); syncursor != getSynchronizeList().end() ; syncursor++){ jacky_os << *syncursor << " "; } } jacky_os << "***" << endl << flush; #endif //there must have some imminent local dependants! MASSERTMSG( msg.time() == absoluteNext(), "FC" + int2Str(id()) + ": Received a (@) msg and msgTime != absoluteNext!" ); lastChange( msg.time() ); nextChange( VTime::Zero ); //1> send (@) msg to all imminent local dependants ParallelFlatCoordinatorState::DependantList::const_iterator cursor; CollectMessage collect( msg.time(), id() ); for( cursor = dependants().begin() ; cursor != dependants().end() ; cursor++ ) { if( cursor->second == msg.time() ) { //dependant's absoluteNext() = msgTime, i.e. imminent getSynchronizeList().insert( cursor->first ); #ifdef JACKY_DEBUG jacky_os << "*** FC receive(@msg) --> insert procId " << cursor->first << " into synchronizeList" << endl << flush; #endif doneCount( doneCount() + 1); send( collect , cursor->first ); }//end if } // end for #ifdef JACKY_DEBUG jacky_os << "*** FC leaving receive(@), getSynchronizeList().size()=" << getSynchronizeList().size() << "***" << endl << flush; if(getSynchronizeList().size() != 0){ jacky_os << "*** imminent dependants are: "; for(syncursor = getSynchronizeList().begin(); syncursor != getSynchronizeList().end() ; syncursor++){ jacky_os << *syncursor << " "; } } jacky_os << "***" << endl << flush; #endif #ifdef JACKY_MSG_TYPE_BASED_STATE_SAVING //[2006-02-23] skipStateSaving = true; //this causes the TimeWarp to skip state saving after processing this event #ifdef JACKY_DEBUG jacky_os << "##### FC::skipStateSaving = TRUE #####" << endl << flush; #endif //end JACKY_DEBUG #endif //end JACKY_MSG_TYPE_BASED_STATE_SAVING [2006-02-23] return *this; } /******************************************************************* * Function Name: receive(const BasicOutputMessage *) * Description: this function processes the Y msg received from local dependants. * It finds out whether the Y msg should be output to the environment, * if so, the Y msg is forwarded to the NC; it also finds out whether * the Y msg is sending to a local simulator, in this case, it simply * translates the Y msg to an X msg and sends it to the local dest * simulators; If the Y msg is sending to remote simulators, the FC * forward the Y msg to the NC ********************************************************************/ ParallelProcessor &ParallelFlatCoordinator::receive( const BasicOutputMessage *msg ) { #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "*** FC receives (Y) at time " << msg->time() << " value " << (msg->value())->asString() << " to port " << (msg->port()).name() << "#" << (msg->port()).model().description() << endl; #endif sortOutputMessage( *msg ); delete msg; #ifdef JACKY_MSG_TYPE_BASED_STATE_SAVING //[2006-02-23] skipStateSaving = true; //this causes the TimeWarp to skip state saving after processing this event #ifdef JACKY_DEBUG jacky_os << "##### FC::skipStateSaving = TRUE #####" << endl << flush; #endif //end JACKY_DEBUG #endif //end JACKY_MSG_TYPE_BASED_STATE_SAVING [2006-02-23] return *this; } /******************************************************************* * Function Name: receive(const BasicExternalMessage *) * Description: this function checks the X msg to see whether the influenceList of the dest * port includes at least one local dependant, if so, the FC puts the (X) msg * in its MessageBag; otherwise, the FC will raise error (it got a wrong msg!) * This is redefined for a double check. Note: in ParallelProcessor::receive(X), * X msgs are added to the MessageBag unconditionally. ********************************************************************/ ParallelProcessor &ParallelFlatCoordinator::receive( const BasicExternalMessage *msg ) { MASSERTMSG( getLocalInfluList().size() == 0, "FC::receive(BasicExternalMessage*) -> Garbage in getLocalInfluList()!" ); findLocalInfluenceList( msg->port() ) ; //all local receiving ports are loaded into the "localInfluList" #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "+++ FC: receive(X) -> getLocalInfluList() is: +++" << endl << flush; InfluenceList::const_iterator cursor( getLocalInfluList().begin() ) ; for( ; cursor != getLocalInfluList().end(); cursor++ ) { jacky_os << "\t (X) msg to ---> " << (*cursor)->name() << "@" << ((*cursor)->model()).asString() << endl << flush; } jacky_os << "+++ getLocalInfluList() END +++" << endl << flush; #endif MASSERTMSG( getLocalInfluList().size() > 0, "FC::receive(BasicExternalMessage*) -> Wrong (X) msg received in FC!" ); getLocalInfluList().clear(); //[2006-03-15] //Jacky Note: Before this function is called, ParallelProcessor has already add pair //to the FCBagReference in the FC's state if InfreqStateManager is defined! externalMsgs.add( msg ); #ifdef JACKY_DEBUG jacky_os << "*** \tFC receive(X) --> (X) msg received in FC, added to bag ***" << endl << flush; #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- jacky_os << endl << "[InfreqStateManager] current Bag is:" << endl << flush; jacky_os << externalMsgs << endl << flush; jacky_os << "[InfreqStateManager] current FCBagRef in state is:" << endl << flush; printFCBagRef( jacky_os ); jacky_os << endl << flush; #endif //[2006-03-15] ------------------------------------------------------------------------------------------- #endif //end JACKY_DEBUG // #ifdef JACKY_X_SKIP_SAVING // Nov. 23, 2005 ############################################################### #ifdef JACKY_MSG_TYPE_BASED_STATE_SAVING //[2006-06-19] //the same effect as the JACKY_X_SKIP_SAVING, but this is to compare the performance improvement //before and after using JACKY_MSG_TYPE_BASED_STATE_SAVING skipStateSaving = true; //this causes the TimeWarp to skip state saving after processing this event #ifdef JACKY_DEBUG //------------------------------------------------------------------ jacky_os << "##### FC::skipStateSaving = TRUE #####" << endl << flush; #endif //end JACKY_DEBUG -------------------------------------------------------------- #endif //##################################################################################################### return *this; } #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- void ParallelFlatCoordinator::printFCBagRef( ostream &out ){ if( getFCBagRef().size() > 0 ){ ParallelFlatCoordinatorState::FCBagReference::const_iterator pos = getFCBagRef().begin(); for( ; pos != getFCBagRef().end(); pos++ ){ //print out the recvTime of the BasicEvent, and its address out << "<" << (pos->first).asString() << ", " << pos->second << "> " << flush; } out << endl << flush; } else { out << "-> EMPTY!" << endl << flush; } } //function to recover the FC's MessageBag based on fcbagref just before coast forward operations inline void ParallelFlatCoordinator::recoverProcessorVariables(){ //we need to get the BasicEvent* from the fcbagref in the current state, and add (X) Message objects //into the FC's MessageBag based on info. extracted from these BasicEvents #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "[InfreqStateManager] FC::recoverProcessorVariables( ) called *************" << endl << flush; jacky_os << "\tcurrent Bag is: " << endl << flush; jacky_os << externalMsgs << endl << flush; jacky_os << "--------------------------------------------------------------------------------" << endl << flush; jacky_os << "\tFCBagRef in the currentState is: " << endl << flush; printFCBagRef( jacky_os ); #endif //Note: rollbackProcessorVariables(rollbackTime) will rollback the FCBag to the status just before rollbackTime //i.e. it's the status at the end of the time slice before TS //This status may not be the same as the status that should be recovered from the FCBagRef during coast forward //we need to empty the FCBag first! if( externalMsgs.size() > 0 ){ externalMsgs.eraseAll(); #ifdef JACKY_DEBUG jacky_os << endl << "[InfreqStateManager] FCBag cleaned!!! *************************" << endl << flush; #endif } //then, we recover the NCBag based solely on the BagRef restored from the state! //iterate over the FCBagRef, create BasicExternalMessage object and add them into FC's MessageBag if( getFCBagRef().size() > 0 ){ ParallelFlatCoordinatorState::FCBagReference::const_iterator pos = getFCBagRef().begin(); BasicEvent* externalMessagePtr; for( ; pos != getFCBagRef().end(); pos++ ){ externalMessagePtr = pos->second; //get the BasicEvent* TWMessage* extMsg = (TWMessage*)externalMessagePtr; //cast to TWMessage //create a new BasicExternalMessage object BasicExternalMessage *xmsg = ((TWExternalMessage *)extMsg)->getMessage(); //add this BasicExternalMessage object into the externalMsgs externalMsgs.add(xmsg); } //end for }//end if #ifdef JACKY_DEBUG jacky_os << endl << "[InfreqStateManager] FCBag is recovered from FCBagRef *************" << endl << flush; jacky_os << "\tcurrent Bag is: " << endl << flush; jacky_os << externalMsgs << endl << flush; jacky_os << "--------------------------------------------------------------------------------" << endl << flush; jacky_os << "\tFCBagRef in the currentState is: " << endl << flush; printFCBagRef( jacky_os ); #endif //At the end of the day, the NCBag & BagRef should contain the same number of elements MASSERTMSG( externalMsgs.size() == (int)getFCBagRef().size(), "NC::recoverProcessorVariables -> at the end, FCBag.size != FCBagRef.size" ); } #endif //[2006-03-15] -------------------------------------------------------------------------------------------