/******************************************************************* * Declaration: This class is partially based on Glinsky's version that * can be found in the "oldsource" directory. * * DESCRIPTION: class ParallelNodeCoordinator * * AUTHOR: Qi (Jacky) Liu * * EMAIL: mailto://liuqi@sce.carleton.ca * * DATE: Sept, 2005 * *******************************************************************/ #ifndef __PNCOORD_H #define __PNCOORD_H #include //Template list #include #include "event.h" //class event #include "eventlist.h" //class EventList #include "NCmsgbag.h" //class NCMessageBag #include "pncoordmodel.h" //class NodeCoordinatorModel #include "pprocess.h" //class Processor #include "pNCoordState.h" //class ParallelNodeCoordinatorState #include "pprocadm.h" //class ParallelProcessorAdmin #include "pmessage.h" #include "JackyDebugStream.h" //jacky-debug-mode #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] #include //multimap #endif //end JACKY_NCBAG_POINTERS //[2006-02-03] /** forward declarations **/ class Coupled ; class ParallelMainSimulator ; class Port ; class VTime ; class ParallelNodeCoordinator : public ParallelProcessor { public: //functions required by TimeWarp void initialize(); BasicState* allocateState(); //function to load external events defined in EV file into the NC (called by loadExternalEvents() //in the ParallelMainSimulator) //Important: this function tests whether the to-be-loaded external event will influence a LOCAL simulator. // It ONLY load external events that will later influence a local simulator (and thus, we need to // send them as (X) msgs to the local FC) !!! ParallelNodeCoordinator &addExternalEvent( const VTime &, const Port &, const Real & ) ; //function to get/set the stop time of the simulation ParallelNodeCoordinator &stopTime( const VTime & ) ; const VTime &stopTime() const ; /*Four receive functions*/ //The following message is not used in the PCD++ //ParallelProcessor &receive( const OutputSyncMessage &); //The following receive functions are not defined in the NC (thay will never be received by the NC) //ParallelProcessor &receive( const InternalMessage &) ; //ParallelProcessor &receive( const CollectMessage &); //this function forwards the (I) msg to the local FC ParallelProcessor &receive( const InitMessage & ) ; //this function is similar to the counterpart of the previous ParallelRoot. //Here, there are more things to consider: when sending (@) or (*) msgs in //this function, BOTH the external events AND the (X) msgs in the MessageBag //need to be considered! ParallelProcessor &receive( const DoneMessage &); //this function processes the Y msg received from the local FC. //It finds out whether the Y msg should be output to the environment. //if so, the Y msg is forwarded to the Root; it also finds out whether //the Y msg is sending to a remote simulator. if not, it raise error //(only msgs that need to be forwarded to remote machines should be //received by the NC). The NC checks what destination machines the msg //should be forwarded, and translates it to X msgs and relays them to remote NCs ParallelProcessor &receive( const BasicOutputMessage *) ; //this function receives (X) msgs coming from the other NCs. //it adds the coming (X) msgs into the NCMessageBag of this NC. These (X) msgs //will be forwared to the local FC in function receive( const DoneMessage &) //Note: msgs in the NCMessageBag may cause rollbacks! we need to check whether // a rollback has happened, and clean our own data accordingly. This is done // by function rollbackCheck() ParallelProcessor &receive( const BasicExternalMessage *) ; //function to retrieve the whole external events EventList &events(); const string description() const ; #ifdef JACKY_STATE //=========================================================== //Jacky: this function will be defined in all processors to clean up // locally defined variables (mainly MessageBag & NCMessageBag) //Oct. 26, 2005 virtual void rollbackProcessorVariables( const VTime& ) ; #endif //end JACKY_STATE ==================================================== #ifdef JACKY_DEBUG void showEvents( EventList &events, ostream &out = cout); //Jacky: this function will be defined in processors to show how locally defined // variables are cleaned in a rollback virtual void showLocallyDefinedVariables( ostream& ) ; #endif #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] //a multimap is used to hold the for each (X) message in the NCBag // 1. When an (X) message is added into the NCBag, its recvTime along with the address // of the BasicEvent carrying this (X) message that has been executed on the inputQ // will be inserted into this multimap. // 2. When (X) messages are removed from the NCBag, including normal removal after sending // these messages and removal during rollbacks of the NCBag, the corresponding elements // in this multimap are removed as well. //Note: Always maintain the 1-to-1 correspondance between the (X) message in NCBag and the entry // in this multimap //[2006-03-15] //Jacky Note: This structure is moved to the state of the NC. // 1. Since NCBagReference contains VTime & address of the BasicEvent on the inputQ, // the BasicEvent* pointers will NOT be destroyed after sending out (X) messages // in the NCBag. The only situation where these pointers are invalid is when the // BasicEvent* is fossil collected by the GVTManager. // 2. The pointers in NCBagReference will be used during coast forward operations! // Before the coast forward operations, TW::recoverProcessorVariables() is called // to create (X) Message objects and insert them into the NCBag based on the content // of the NCBagReference in the restored state! //[2006-03-26] //Jacky Note: this structure should be here rather than in NC's state. The NC will use InfreqStateManager //as a normal StateManager and will not do coast forward operation! typedef multimap< const VTime, BasicEvent* > NCBagReference; //pair NCBagReference bagref; NCBagReference &getBagRef(); //function to get the bagref multimap void printNCBagRef( ostream &out ); //function to print out the NCBagReference //Note: //The pointers saved in the NCBag are NOT the address of the BasicEvents that carries the (X) message //The BasicEvent is execued, and the information about the (X) message it carries is extracted, then //a new BasicExternalMessage is allocated to hold this infomation. The address of this BasicExternalMessage //is put in the NCBag, while the address of the BasicEvent is lost after the processing. NC doesn't know //the BasicEvent on the inputQ from which the (X) message in the NCBag is generated. //Therefore, we use this multimap to maintian this connection. NC can check this NCBagReference to see //what's the BasicEvents from which all (X) messages in the NCBag with a given recvTime are derived. #endif //end JACKY_NCBAG_POINTERS [2006-02-03] /* //[2006-03-26] // Jacky Note: Since the NC will NOT do coast forward, we need not to define this function here. // The NC inherits recoverProcessorVariables() from TimeWarp class, which is empty. #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] //Jacky Note: this function is to recover the variables defined in processors // such as the NCMessageBag before the coast forward operations virtual void recoverProcessorVariables( ); #endif //[2006-03-15] */ protected: //Note: this variable is moved into the ParallelNodeCoordinatorState, Oct. 19, 2005 //typedef set ProcSet; //the set is used to record the procIds of remote NCs to which we need to send an (X) msg //Note: only ONE (X) msg will be sent to each remote NC //ProcSet remoteNCList; //function to retrieve the remoteNCList defined in the state, Oct. 19, 2005 ParallelNodeCoordinatorState::ProcSet &getRemoteNCList() const; //function to analyze the received Y msgs (from the FC). Used by receive(const BasicOutputMessage *) ParallelNodeCoordinator& sortOutputMessage( const BasicOutputMessage & ); //function to calculate the new MIN time for next msg (Collect or Internal message) /*Note: this will take into account the MIN time of the external * events, the X msgs in the NCMessageBag, and the absoluteNext() * time calculated from the (D) msg from the FC */ VTime calculateMinTime(); //Checks if a rollback has taken place and cleans local variables. //This is a virtual function defined in ParallelProcessor, we override it //here to clean our specific local variables //bool rollbackCheck(const VTime& currentTime); private: friend class ParallelProcessorAdmin ; friend class ParallelMainSimulator; #ifdef JACKY_DEBUG friend class ParallelNodeCoordinatorState; #endif ParallelNodeCoordinator( NodeCoordinatorModel*) ; // Default constructor //function to find all remote NCs that will receive an (X) msg, the procIds //of these remote NCs are stored in "remoteNCList" ParallelNodeCoordinator& findRemoteNCs(const Port&); //function to test whether the specified port will eventually connect to an output port of the TOP model bool isSendingToEnvironment(const Port&); //function to test whether an external event will influence a local simulator under the control of the local FC bool isSendingToLocalFC(const Port&); //NextMsgType management functions void sendMsgType( ParallelNodeCoordinatorState::NextMsgType msgType); ParallelNodeCoordinatorState::NextMsgType sendMsgType(); //EventList management functions EventList::iterator currentEvent(); void eventsBegin(); void eventsMoveNext(); void eventsMovePrev(); //Nov. 10, 2005 bool endOfEvents(); #ifdef JACKY_RB_EVENT //eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee //Nov. 13, 2005 //we need a data structure to hold the (X) msgs derived from the EventList & the following (*) msg // //Note: when the NC sends out external events to the FC, there could be several situations as follows: //CASE 1: event time = minTime = absoluteNext() -> i.e. external events happen when there are imminent cells // => in this case, the NC sends: // a> one/more (X) msgs derived from the EventList with recvTime = minTime = absoluteNext() // b> [one/more (X) msgs in the NCBag with recvTime = minTime = absoluteNext()] // c> a (@) msg with recvTime = minTime = absoluteNext() //CASE 2: event time = minTime != absoluteNext() -> i.e. external events happen when there is no imminent // => in this case, the NC sends: // a> one/more (X) msgs derived from the EventList with recvTime = minTime != absoluteNext() // b> [one/more (X) msgs in the NCBag with recvTime = minTime != absoluteNext()] // c> a (*) msg with recvTime = minTime != absoluteNext() // //Important: // The event (X) msg is: NC@sendTime -> FC@recvTiem, where sendTime is the lVT time for the NC, and // the recvTime is the time of the event. For example, if the NC sends an event with time 2000 at lVT // 1000, the event (X) msg is: NC@1000 -> FC@2000 // When rollback happeds on the NC, if rollbackTime = 2000 (i.e. rollback to the END of time 1000), // this event (X) msg will NOT be cancelled since it has a send time of 1000! // But if rollbackTime = 1000 (i.e. rollback to the END of 000), this event (X) msg will be cancelled // by the rollback mechanism in the TimeWarp kernel. // For this reason, we need to record the sendTime in the Event object! //private function for sending all external events with minTime in the EventList void sendMinTimeEventsToFC( const VTime &minTime ); //private function for sending all (X) messages with minTime in the NCBag //return the number of minTime (X) messages in the NCBag unsigned int sendMinTimeNCBagMsgsToFC( const VTime &minTime ); //function to check whether there is a @/* message with recvTime > the given minTime that has been //unprocessed on the current inputQ. This fucntion is used by receive(DoneMessage) to handle the //situation of multiple straggler messages after rollbacks BasicEvent* findUnprocessedCollectORInternalMessageAfter( const VTime& ); //function to find all messages that have been sent before saving a certain state list< Container* > findAllOutputMessagesForState( const BasicState* ); //function to find those events that are derived from external events from a list of pointers to Containers list< BasicEvent* > findMessagesDerivedFromExternalEventsFromContainers( const list< Container* >& ); #ifdef JACKY_NC_GET_OUTPUT_MSG_FROM_futureCIEvent //[2006-02-13] //function to find those events that are derived from external events from a list of pointers to BasicEvents //in multiple-straggler situations, if the NC find the stateFound has a NULL outputPos, i.e. the NC's stateQ //and outputQ have already been garbage collected, we need to find the messages derived from external events //from the inputQ instead of the NC's outputQ. list< BasicEvent* > findMessagesDerivedFromExternalEventsFromBasicEvents( const list< BasicEvent* >& ); //function to find all unprocessed event that were sent to the FC along with the futureCIEvent on the inputQ //this function searches the current inputQ to find all unprocessed events that have the following attributes: //1. event->recvTime = futureCIEvent->recvTime //2. event->sendTime = futureCIEvent->sendTime //3. event->alreadyProcessed = false //4. event->sender = this NC & event->receiver = the FC list< BasicEvent* > findAllFutureEvents( const BasicEvent* futureCIEvent ); //function to find all messages that have been sent before saving a certain state list< Container* > findAllOutputMessagesForFutureEvents( const list< BasicEvent* >& ); #endif //end JACKY_NC_GET_OUTPUT_MSG_FROM_futureCIEvent [2006-02-13] //function to roll back the eventCursor of the EventList void rollbackExternalEvents( VTime &eventTime ); #endif //eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee //Dormancy management functions void isDormant(bool); bool isDormant(); //function to check whether there are unprocessed events in the inputQ that have the same timestamp as //those (X) messages already in the NCMessageBag and before the stop time. //This function is used to identify the dormancy state of the NC bool hasUnprocessedEventWithSmallestTimeBeforeStop(); #ifdef JACKY_RB_EXCEPTION //================================================================================== //function to peek ahead the next event to be executed in the current inputQ //Nov. 3, 2005 BasicEvent* peekNextEventToBeExecuted(); //Nov. 3, 2005 //function to check whether there is already a @ message with the given recvTime that has not yet been //processed on the current inputQ. This fucntion is used by receive(DoneMessage) to handle exceptional //situations during rollbacks bool hasUnprocessedCollectMessageAt( const VTime& ); #endif //end JACKY_RB_EXCEPTION ============================================================================== //function to send out (X) messages in the bag followed by a (*) message to the FC. This function will set //lastChange to the time of the bag, and nextChange to 0. It will also set the nextMessageType to @. After //this function is invoked, the NCBag is properly cleaned void sortMessagesInBag(); //Note: This event cursor is moved into the state of the NC //Nov. 14, 2005 //EventList::iterator eventsCursor; //a pointer refers to the current event in the EventList VTime timeStop ; //time to stop the simulation specified on command line //Note: we should not put the external event list in the state of the NC EventList externalEvents; //all external events loaded from .EV file /*Note: We cannot put the NCMessageBag in the state of the NC. The reason is as follows: ** The NCMessageBag contains pointers to BasicPortMessages and these messages will ** be deleted after being processed. When there is a rollback, these (already deleted) ** messages cannot be recovered. Thus, if the NCMessageBag is rolled back to a previous ** state (contents), it will contain invalid pointers! Instead, we need a mechanism in ** the TimeWarp kernel to clear the NCMessageBag whenever there is a rollback. ** Oct. 26, 2005 */ NCMessageBag bag; //all (X) msgs received from other NCs //function to retrieve the NCbag, Oct. 18, 2005 NCMessageBag &getNCBag(); ProcId localFCId; //procId of the local FC (the only child of this NC) }; // class NodeCoordinator inline EventList &ParallelNodeCoordinator::events() { return externalEvents ; } inline ParallelNodeCoordinator &ParallelNodeCoordinator::stopTime( const VTime &t ) { timeStop = t ; return *this ; } inline const VTime &ParallelNodeCoordinator::stopTime() const { return timeStop ; } inline const string ParallelNodeCoordinator::description() const { return "ParallelNodeCoordinator" ; } /*EventList management functions*/ /******************************************************************* * Function Name: currentEvent() * Description: retrieve the current event iterator from the Node Coordinator State ********************************************************************/ inline EventList::iterator ParallelNodeCoordinator::currentEvent(){ return ((ParallelNodeCoordinatorState *)state->current)->eventsCursor; //return eventsCursor; } /******************************************************************* * Function Name: eventBegin() * Description: move the current event iterator to the first event ********************************************************************/ inline void ParallelNodeCoordinator::eventsBegin() { (((ParallelNodeCoordinatorState *)state->current)->eventsCursor) = externalEvents.begin(); //eventsCursor = externalEvents.begin(); } /******************************************************************* * Function Name: eventsMoveNext() * Description: move the current event iterator to the next event ********************************************************************/ inline void ParallelNodeCoordinator::eventsMoveNext() { ((ParallelNodeCoordinatorState *)state->current)->eventsCursor++; //eventsCursor++; } /******************************************************************* * Function Name: eventsMoveNext() * Description: move the current event iterator to the previous event * for event list rollbacks * Nov. 10, 2005 ********************************************************************/ inline void ParallelNodeCoordinator::eventsMovePrev() { ((ParallelNodeCoordinatorState *)state->current)->eventsCursor--; //eventsCursor--; } /******************************************************************* * Function Name: endOfEvents() * Description: test whether the end (position beyond the last event) of the event list is reached ********************************************************************/ inline bool ParallelNodeCoordinator::endOfEvents() { return ((((ParallelNodeCoordinatorState *)state->current)->eventsCursor) == externalEvents.end()); //return eventsCursor == externalEvents.end(); } /*functions required by TimeWarp*/ inline BasicState* ParallelNodeCoordinator::allocateState() { return new ParallelNodeCoordinatorState; } /*NextMsgType management functions*/ inline void ParallelNodeCoordinator::sendMsgType( ParallelNodeCoordinatorState::NextMsgType msgType) { ((ParallelNodeCoordinatorState *)state->current)->nextMsgType = msgType; } inline ParallelNodeCoordinatorState::NextMsgType ParallelNodeCoordinator::sendMsgType() { return ((ParallelNodeCoordinatorState *)state->current)->nextMsgType; } /*Dormancy management functions*/ inline void ParallelNodeCoordinator::isDormant(bool newVal){ ((ParallelNodeCoordinatorState *)state->current)->dormant = newVal; } inline bool ParallelNodeCoordinator::isDormant(){ return ((ParallelNodeCoordinatorState *)state->current)->dormant; } //function to retrieve the remoteNCList defined in the state, Oct. 19, 2005 inline ParallelNodeCoordinatorState::ProcSet& ParallelNodeCoordinator::getRemoteNCList() const { return ((ParallelNodeCoordinatorState *)state->current)->remoteNCList; } //function to retrieve the NCbag, Oct. 19, 2005 inline NCMessageBag& ParallelNodeCoordinator::getNCBag() { return bag; } #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] ---------------------- //function to get the "bagref" multimap inline ParallelNodeCoordinator::NCBagReference& ParallelNodeCoordinator::getBagRef() { //return ((ParallelNodeCoordinatorState *)state->current)->bagref; return bagref; } #endif //end JACKY_NCBAG_POINTERS [2006-02-03] ------------------ #ifdef JACKY_STATE //========================================================================================= /******************************************************************* * Function Name: rollbackProcessorVariables * Description: clean up locally defined variables during rollbacks * Specifically, we need to remove all messages in the NCMessageBag with * time >= the given rollbackTime * Oct. 26, 2005 ********************************************************************/ inline void ParallelNodeCoordinator::rollbackProcessorVariables( const VTime& rollbackTime ) { #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] //the size of the NCBag should always be the same as the size of the NCBagReference MASSERTMSG( getNCBag().size() == (int)getBagRef().size(), "NC::rollbackProcessorVariables() -> 1. NCBag.size != BagRef.size" ); #endif //end JACKY_NCBAG_POINTERS [2006-02-03] if( getNCBag().size() > 0 ){ //remove all (X) messages in the NCBag with recvTime >= rollbackTime getNCBag().removeMessageWithTimeGreaterOrEqual( rollbackTime ); #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] //remove all elements in Bagref with recvTime >= rollbackTime getBagRef().erase( getBagRef().lower_bound(rollbackTime), getBagRef().end() ); #endif //end JACKY_NCBAG_POINTERS [2006-02-03] } #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] //After the removal, the size of the NCBag should still be the same as the size of the NCBagReference MASSERTMSG( getNCBag().size() == (int)getBagRef().size(), "NC::rollbackProcessorVariables() -> 2. NCBag.size != BagRef.size" ); #endif //end JACKY_NCBAG_POINTERS [2006-02-03] } #endif //end JACKY_STATE =================================================================================== #ifdef JACKY_DEBUG /******************************************************************* * Function Name: showLocallyDefinedVariables * Description: show how locally defined variables are cleaned during * rollbacks for debugging purpose * Oct. 26, 2005 ********************************************************************/ inline void ParallelNodeCoordinator::showLocallyDefinedVariables( ostream& out ) { out << ">>>>NC[" << id() << "] showLocallyDefinedVariables(), NCMessageBag is =>" << endl << flush; //out << bag << endl << flush; out << getNCBag() << endl << flush; #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] out << ">>>>NC[" << id() << "] showLocallyDefinedVariables(), STATE::NCBagRef is =>" << endl << flush; printNCBagRef(out); out << endl << flush; #endif //end JACKY_NCBAG_POINTERS [2006-02-03] out << ">>>>NC[" << id() << "] showLocallyDefinedVariables(), eventList is =>" << endl << flush; showEvents(events(), out); } #endif //end JACKY_DEBUG #endif //__NODECOORDINATOR_H