/******************************************************************* * Revised By: Qi (Jacky) Liu * * EMAIL: mailto://liuqi@sce.carleton.ca * * Revision Date: Oct. 26, 2005 *******************************************************************/ //-*-c++-*- #ifndef TIMEWARP_HH #define TIMEWARP_HH // Copyright (c) 1994-1996 Ohio Board of Regents and the University of // Cincinnati. All Rights Reserved. // BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY // FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT // PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, // EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE // PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME // THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. // IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING // WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR // REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR // DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL // DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM // (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED // INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF // THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH HOLDER // OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. // // // $Id: TimeWarp.hh,v 1.1.1.1 2007/03/15 15:45:05 rmadhoun Exp $ // //--------------------------------------------------------------------------- #include #include "BasicTimeWarp.hh" #include "DefaultVTime.hh" #include "FileQueue.hh" #include "InFileQueue.hh" #include "KernelMsgs.hh" #include "Stopwatch.hh" #include "LogicalProcess.hh" // needed for MESSAGE_AGGREGATION, // ONE_ANTI_MESSAGE, and for stats collection #include //Nov. 21, 2005 #include "../../../JackyDebugStream.h" //#include "SortedList.hh" // Needed for Lazy Cancellation class BasicEvent; class TimeWarp : public BasicTimeWarp { friend class MatternGVTManager; friend class GVTManager; public: TimeWarp(); virtual ~TimeWarp(); void executeSimulation(); void simulate(); void saveState(); // Save our current state. //------------------------------------------------------------------------------------ //Jacky Note: //GVTManager::gcollect() call stacks: // 1. stateGcollect() // 2. stateClear() // 3. outputGcollect() // 4. CommManager::gcollect() // 5. inputGcollect() //[2006-03-20] //Jacky Note: This function is called by GVTManager::gcollect() as Step 5 int inputGcollect(VTime, BasicEvent *ptr = NULL); //[2006-03-20] //Jacky Note: This function is called by GVTManager::gcollect() as Step 1 virtual VTime stateGcollect(VTime, BasicEvent*&, Container*&); virtual VTime stateGcollect(VTime){ return ZERO; } //[2006-03-20] //Jacky Note: This function is called by GVTManager::gcollect() as Step 2 void stateClear(); void clearInitState(); //[2006-03-20] //Jacky Note: This function is called by GVTManager::gcollect() as Step 3 void outputGcollect(VTime, Container*&); void outputGcollect(VTime); // Dummy function. Don't call this one!!! //--------------------------------------------------------------------------------------- virtual BasicEvent *getEvent(); // gets an event from the input queue virtual void sendEvent(BasicEvent *); // put an event on the output queue void recvEvent(BasicEvent *); // how to handle incoming messages BasicEvent* findPositionFromState(); // get outputQ position from the state // this calls the application to execute its code for one simulation cycle void executeProcess() = 0; // the application overrides these two functions, if desired, to // contain code to be executed before and after simulation starts // and ends. virtual void initialize() {}; virtual void finalize() {}; void finalGarbageCollectOutputQueue(); void finalGarbageCollect(); // this method is for the LP to query whether or not we are within // our time window... bool withinTimeWindow(); // this method allows our time window to be set. void setTimeWindow(VTime newWindow); void setLPHandle(LogicalProcess* lpHandle); // this method returns the time of the earliest unexecuted event. VTime calculateMin(); // returns this object's current simulation time inline VTime getLVT() const { return state->current->lVT; } void rollbackFileQueues(VTime time); #ifdef JACKY_STATE //=========================================================== //Jacky: this function will be defined in all processors to clean up // locally defined variables (mainly MessageBag & NCMessageBag) //Oct. 26, 2005 virtual void rollbackProcessorVariables( const VTime& ) ; #endif //end JACKY_STATE ==================================================== #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ------------------ //Jacky Note: this function is to recover the variables defined in processors // such as the NCMessageBag before the coast forward operations virtual void recoverProcessorVariables( ); //Jacky Note: this function is to get the value of "suppressMessage" int getSuppressMessage(); //Jacky Note: this function is to get the lVT of the LAST state saved before the given gVT VTime getLastStateTimeBefore( const VTime& gVT ); #endif //[2006-03-15] ---------------------------------------------------------------------------------- #ifdef JACKY_DEBUG //Jacky: this function will be defined in processors to show how locally defined // variables are cleaned in a rollback virtual void showLocallyDefinedVariables ( ostream& ) ; //this is for debugging rollbacks after "multiple straggler" and rollback to previous jump point //a set of VTime recording the time of the LAST state (dummy state) in the NC's stateQ //set< VTime, less > rollbackCheckTimes; //Nov. 21, 2005 //bool isNC; #endif //end JACKY_DEBUG #if defined(OBJECTDEBUG) && !defined(LPDEBUG) void openFile() { BasicTimeWarp::openFile(); lpFile = &outFile; }; #endif void setFile(ofstream *outfile); #ifdef STATEDEBUG void setStateFile(ofstream *outfile); #endif #ifdef DC_DEBUG ofstream dcFile; void openDCFile(char* dcFilename); #endif // data structures for file access int numOutFiles; int numInFiles; // We instantiate a out file queue for writing output to stdout. This // is done as follows: FileQueue is newed with the constructor having // the argument FileQueue::standardOut. This tells the file queue to // output all the data that it recieves to stdout and not to a file. // The file id is also accessed as FileQueue::standardOut. FileQueue stdoutFileQ; FileQueue *outFileQ; InFileQueue *inFileQ; enum MSGSUPPRESSION { NONE, // No Message Supression in effect. [0] LAZYCANCEL, // Lazy Cancellation Suppression. [1] COASTFORWARD, // Infrequent State Saving Suppression [2] LAZYAGGRCANCEL // For dynamic cancellation [3] }; #if defined(ONE_ANTI_MESSAGE) //[2006-03-04] Jacky Note: //an array of for identifying to which destination we have already sent [-] msg //1. array index is the destID; //2. alreadySentAntiMessage[destID] is the message to the cancelled on the outputQ, i.e. cancelEvent //if an [-] msg has not yet been sent to the destID, alreadySentAntiMessage[destID] = NULL; //otherwise, alreadySentAntiMessage[destID] = the cancelEvent from the outputQ BasicEvent **alreadySentAntiMessage; #endif LogicalProcess* getLPHandle() const { return lpHandle; } void terminateSimulation(char *msg); #if defined(LAZYCANCELLATION) || defined(LAZYAGGR_CANCELLATION) VTime getLazyQMinTime(); int getLazyQSize(); #endif #ifdef STATS void setStatsFile (ofstream *statsfile) { fileHandle = statsfile; state->setStatsFile(fileHandle); }; #endif protected: // rollback to the time specified virtual void rollback(VTime); // Flag to determine when to suppress a message int suppressMessage; #ifdef JACKY_RB_EXCEPTION // Nov. 4, 2005 ============================================================ //Jacky Note: this protected flag is to determine whether we should perform state // saving after an event has been executed // This is necessary when a positive straggler (X) message has been sent from the // NC to the FC (during processing the NCBag) that triggered rollbacks on the FC // as well as on all simulators and on the NC. Since the original (X) message received // by the NC that corresponding to this straggler message will be unprocessed during the // rollback, and the state of the NC will be restored to a previous time, we should not // save an extra state in this situation! Otherwise, this extra state will have a // outputPos pointing to this staggler message and the following rollbacks will fail to // bring the states of processors back to a consistent condition. // This flag has a default value of false, i.e. in normal situations, state will always be // saved. It will be set by the NC when it detects there are rollbacks triggered by a (X) // message sending to the FC in its receive(DoneMessage) function. //[2006-04-03] Jacky Note: This flag was originally added to solve the One-Straggler problem. //Now, it is part of the "2-level state saving strategy"! bool skipStateSaving; #endif //end JACKY_RB_EXCEPTION ======================================================================= virtual bool suppressLazyMessages(BasicEvent*); #ifdef LAZYAGGR_CANCELLATION virtual bool suppressMessageTest(BasicEvent*) { return false ;} ; #endif #if defined(LAZYCANCELLATION) || defined(LAZYAGGR_CANCELLATION) // Queue used for lazy cancellation. OutputQueue lazyCancelQ; // Should we dispatch a message during lazy cancellation or not? // basically was the comparision a "hit" or a "miss" bool lazyCancel(BasicEvent*); // Invoked during lazy cancellation void moveMessagesToLazyQueue(const VTime); #endif // Same as sendEvent(), but event is sent out without performing any checks. void sendEventUnconditionally(BasicEvent*); private: Stopwatch stopWatch; // this defines how far this object can execute into the future. VTime timeWindow; // increments each time we send an event to produce a unique id for // events (the id, eventID tuple is unique) SequenceCounter eventCounter; // handle to this object's LP, to access terminateSimulation LogicalProcess* lpHandle; // for statistical use only int rollbackCount; bool timingsNeeded; void coastForward(VTime ); // Invoked during aggressive cancellation void cancelMessagesAggressively(const VTime); // Used only during lazy cancellation void fillEventInfo(BasicEvent* ); void cancelMessages(const VTime) {}; const int MAXDIFF; #ifdef STATS SIGN lastMessageSign; #endif // NEW_STATE_MANAGEMENT public: // The necessary functions for the new style of state management. // In the new style, the templates will be rippled oof and all the // state saving goes via the process. These functions let the user do this virtual void deAllocateState(BasicState*); // In the new style of state management, state->current will be protected. // and will not be available for access everywhere. If you need the current // state you have to use this method to get it and then you will have to // type cast it to the right state. virtual STATE_MANAGER* getStateManager() const { return state; } inline BasicState* getCurrentState() const { return getStateManager()->getCurrentState(); } private: // This method is the one that is going to be called by logical process // to initialize the simulation objects henceforth. void timeWarpInit(); }; #ifdef JACKY_STATE //Jacky: this function will be defined in all processors to clean up // local defined variables (mainly MessageBag & NCMessageBag) //Oct. 26, 2005 inline void TimeWarp::rollbackProcessorVariables( const VTime& ) {} #endif //end JACKY_STATE #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------- //Jacky Note: this function is to recover the variables defined in processors // such as the NCMessageBag before the coast forward operations //The Node Coordinator will create Message objects based on the BasicEvent* (which are //saved in bagref in the restored state) and put them in the NCBag before coast forward. inline void TimeWarp::recoverProcessorVariables( ){} //Jacky Note: this function is to get the value of "suppressMessage" inline int TimeWarp::getSuppressMessage(){ return suppressMessage; } //Jacky Note: this function is to get the lVT of the LAST state saved before the given gVT inline VTime TimeWarp::getLastStateTimeBefore( const VTime& gVT ){ #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); if( (gVT == ZERO) || (gVT == PINFINITY) ){ cerr << "TW[" << id << "]::getLastStateTimeBefore() called with gVT = " << gVT.asString() << endl; abort(); } #endif //we need to find the LAST state on our current stateQ whose lVT is less than gVT //this is just like the restoreState() operation during rollbacks. BasicState *lastState; VTime lastStateTime; //get the stateQ for this simuObj, and call find() to get the LAST state before gVT //Since the find() operation will modify the findPos of the stateQ, we save the findPos before find() and //restore it to its original value after find(). This is for safty reason! state->getStateQueue()->saveFindPos(); //we always search from the tail, set findPos = NULL before search state->getStateQueue()->setFindPos(NULL); //the resulting lastState may be: // 1. the LAST state whose lVT < gVT // OR 2. NULL if all states on the stateQ have lVT >= gVT lastState = state->getStateQueue()->find(gVT, LESS); if(lastState == NULL){ //we cannot find any state with lVT < gVT lastStateTime = ZERO; } else { lastStateTime = lastState->lVT; } //restore the findPos of the stateQ state->getStateQueue()->restoreFindPos(); #ifdef JACKY_DEBUG //print out the current stateQ state->printQ(jacky_os); jacky_os << "\t->TW[" << id << "](localId=" << localId << ") => lastStateTime = " << lastStateTime.asString() << " (gVT = " << gVT.asString() << ")" << endl << flush; #endif return lastStateTime; } #endif //[2006-03-15] -------------------------------------------------------------------------------------- #ifdef JACKY_DEBUG //Jacky: this function will be defined in processors to show how locally defined // variables are cleaned in a rollback inline void TimeWarp::showLocallyDefinedVariables( ostream& ) {} #endif //end JACKY_DEBUG #endif