/******************************************************************* * Revised By: Qi (Jacky) Liu * * EMAIL: mailto://liuqi@sce.carleton.ca * * Revision Date: Sept. 6, 2005 *******************************************************************/ //-*-c++-*- #ifndef TIMEWARP_CC #define TIMEWARP_CC // Copyright (c) 1994-1996 Ohio Board of Regents and the University of // Cincinnati. All Rights Reserved. // // BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY // FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT // PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, // EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE // PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME // THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. // // IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING // WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR // REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR // DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL // DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM // (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED // INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF // THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH HOLDER // OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. // // $Id: TimeWarp.cc,v 1.1.1.1 2007/03/15 15:45:06 rmadhoun Exp $ // //--------------------------------------------------------------------------- #include "LogicalProcess.hh" #include "warpedNewHandler.hh" #include "SortedList.hh" #include #include "../../../JackyDebugStream.h" //for jacky-debug-mode #ifdef JACKY_DEBUG #include "../../../message.h" #include "../../../pmessage.h" #endif TimeWarp::TimeWarp() : stdoutFileQ(FileQueue::standardOut), MAXDIFF(1000) { state = new STATE_MANAGER(this); numInFiles = 0; inFileQ = NULL; numOutFiles = 0; outFileQ = NULL; eventCounter = 0; rollbackCount = 0; #ifdef JACKY_RB_EXCEPTION // Nov. 4, 2005 ============================================ skipStateSaving = false; #endif //end JACKY_RB_EXCEPTION ====================================================== //#ifdef JACKY_DEBUG // Nov. 21, 2005 //rollbackCheckTime = VTime::Zero; //isNC = false; //#endif //end JACKY_DEBUG printIt = false; // CurrentState gets set in the StateManager constructor set_new_handler(warpedNewHandler); timeWindow = PINFINITY; suppressMessage = NONE; // this is the default timingsNeeded = true; #ifdef MATTERNGVTMANAGER color = 0; // color initialized to white currentColorMessages = 0; // number of white messages initialized to zero #endif #ifdef ONE_ANTI_MESSAGE alreadySentAntiMessage = NULL; #endif } void TimeWarp::finalGarbageCollectOutputQueue() { Container* outputQptr = NULL; BasicEvent* inputQptr = NULL; stateGcollect(PINFINITY, inputQptr, outputQptr); outputGcollect(PINFINITY, outputQptr); } void TimeWarp::finalGarbageCollect() { BasicEvent* inputQptr = NULL; inputGcollect(PINFINITY, inputQptr); #ifdef STATS //collect stats lpHandle->lpStats.setTotalEventsCommitted(inputQ.getNumberOfCommittedEvents()); #endif // garbage collect file queues and close files register int i; for(i = 0; (i < numInFiles); i++) { inFileQ[i].gcollect(PINFINITY); } if (inFileQ != NULL) { delete [] inFileQ; inFileQ = NULL; } for(i = 0; (i < numOutFiles); i++) { outFileQ[i].gcollect(PINFINITY); } if (outFileQ != NULL) { delete [] outFileQ; outFileQ = NULL; } stdoutFileQ.gcollect(PINFINITY); } TimeWarp::~TimeWarp() { #ifdef REUSESTATS Container dummy; cerr << " " << name; dummy.printReuseStats(cout); #endif #if defined(INFREQSTATEMANAGER) || defined(LINSTATEMANAGER) || defined(NASHSTATEMANAGER) || defined(COSTFUNCSTATEMANAGER) #ifdef STATS *fileHandle << name << " state period = " << state->getStatePeriod() << endl; #endif #endif #ifdef OBJECTDEBUG *lpFile << "all done!" << endl; outFile.close(); #endif #ifdef DC_DEBUG dcFile.close(); #endif #ifdef STATEDEBUG *lpFile2 << " After garbage collection for PINFINITY (object) " << id << endl; outputQ.print(*lpFile2); #endif #ifdef STATEDEBUG *lpFile2 << " After garbage collection for PINFINITY (object) " << id << endl; outputQ.print(*lpFile2); #endif #ifdef ONE_ANTI_MESSAGE delete [] alreadySentAntiMessage; #endif delete state; if (name != NULL) { delete [] name; name = NULL; } } //Jacky: this function returns the event to be executed from the inputQ, and advance //lVT in the current state to the recvTime of this event BasicEvent* TimeWarp::getEvent() { #ifdef MESSAGE_AGGREGATION #ifndef ADAPTIVE_AGGREGATION lpHandle->incrementAgeOfAggregatedMessage(); #endif #endif BasicEvent *toExecute; // if a rollback needs to be performed, it should have happened before now // this is the object ID, and not the localID used by the singleQ. toExecute = inputQ.getAndSeek(id, localId); // toExecute is now either NULL or a valid message if (toExecute != NULL) { #ifdef STATS // collect stats lpHandle->simObjArray[localId].incrementEventsProcessed(); lpHandle->lpStats.incrementTotalEventsProcessed(); #endif //Jacky: advance lVT in the current state to the recvTime state->current->lVT = toExecute->recvTime; }//end toExecute != NULL return(toExecute); } //Jacky: this function send the given event to its destination //Whether sending the event or not depends on flag "suppressMessage", which is defined as a protected data in this class //flag "suppressMessage" can take one of the values given in enum type MSGSUPPRESSION: // 1. NONE [00] // No Supression (Aggresive Cancellation & save state every event) // 2. LAZYCANCEL [01] // Lazy Cancellation (send only if new event is different than the original one) // 3. COASTFORWARD [02] // Infrequent State Saving // 4. LAZYAGGRCANCEL [03] // Dynamic cancellation void TimeWarp::sendEvent(BasicEvent* toSend ) { bool suppressDecision; #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); #endif //fill in the attributes of the event to be sent: sendTime = current->lVT, sender, alreadyProcessed = false FILL_EVENT_INFO(toSend); if (suppressMessage == NONE) { //1. this is the default case (Aggressive cancellation & Save state every event) suppressDecision = false; } else if (suppressMessage == COASTFORWARD) { //2. if infrequent state saving is used, don't send event in Coast Forward phase! suppressDecision = true; #ifdef JACKY_DEBUG jacky_os << "\t\t=> CoastForward: No Sending output!" << endl << flush; #endif } else { //3. if Lazy Cancellation or Dynamic Cancellation is used, sending the event or not // depends on function LAZY_CANCEL_TEST() // Function LAZY_CANCEL_TEST() can be: // a. false -> for Aggressive Cancellation // b. suppressLazyMessages() -> for Lazy Cancellation // c. suppressMessageTest() -> for Dynamic Cancellation (LAZYAGGRE_CANCELLATION) suppressDecision = LAZY_CANCEL_TEST(toSend); #ifdef JACKY_DEBUG if( suppressDecision == true ){ jacky_os << "\t\t=> LAZY_CANCEL: No Sending output!" << endl << flush; } #endif } if(suppressDecision == false) { sendEventUnconditionally(toSend); } else { delete [] (char *) toSend; } } //Jacky: this function fill in the attributes of the event before it is sent void TimeWarp::fillEventInfo(BasicEvent* eventToSend ) { eventToSend->sendTime = state->current->lVT; //sendTime eventToSend->sender = id; //sender eventToSend->alreadyProcessed = false; //alreadyProcessed } //Jacky: this function actually sends out the given event to its destination ([+] or [-]) void TimeWarp::sendEventUnconditionally(BasicEvent* toSend) { int receiver = toSend->dest; //destination #ifdef MATTERNGVTMANAGER toSend->color = color; if(toSend->dest != toSend->sender){ if(color == 0){ whiteMessages++; } else { redMessages++; tMin = MIN_FUNC(tMin,state->current->lVT); } } #endif if(toSend->sign == POSITIVE){ //for [+] message #ifdef STATS //collect stats lpHandle->simObjArray[localId].incrementPosSent(); #endif // finish filling in the event to send toSend->sendTime = state->current->lVT; toSend->eventId = eventCounter; //eventId, others have been filled in fillEventInfo() toSend->sender = id; toSend->alreadyProcessed = false; // increment our event counter eventCounter++; // now send the [+] message for real, and put it on the outputQ in case of rollback // grab the communication handle of the receiving object and send message send the [+] message #ifdef LPDEBUG *lpFile << name << ":sending : " << *toSend << endl; #endif #ifdef DC_DEBUG dcFile << name << ":sending : " << *toSend << endl; #endif // Hack Hack // pause event timing because we don't want to take into account the // time required to rollback a process on the same LP. //(This is due to recvEvent() being called for TW processes on the same LP //instead of CommManager::recvEvent() ). state->pauseEventTiming(); commHandle[receiver].ptr->recvEvent(toSend); state->resumeEventTiming(); //[2006-04-04] //Jacky Note: If JACKY_SINGLE_MACHINE is defined, we are running simulation on a single machine, //so we don't need to save outgoing events on the outputQ. #ifndef JACKY_SINGLE_MACHINE //[2006-04-04] // put the positive message on the output queue outputQ.insert(toSend); #endif //end JACKY_SINGLE_MACHINE //[2006-04-04] } else if( toSend->sign == NEGATIVE ) { //for [-] message #ifdef STATS // collect stats lpHandle->simObjArray[localId].incrementNegSent(); #endif // msg has been sent to us from the outputQ, and the sign has already // been set to negative. We just sent it off. // grab the communication handle of the receiving object and send message #ifdef LPDEBUG *lpFile << name << ":sending : " << *toSend << endl; #endif // Hack Hack // pause event timing because we don't want to take into account the // time required to rollback a process on the same LP. //(This is due to recvEvent being called for TW processes on the same LP //instead of CommManager::recvEvent() ). state->pauseEventTiming(); #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "\t(-)(-)(-)(-) TW::sendEventUnconditionally() -> about to send NegEvent : " << *toSend << endl << flush; #endif commHandle[receiver].ptr->recvEvent(toSend); state->resumeEventTiming(); } else { cerr << "[" << lpHandle->getLPid() << "] TimeWarp::sendEventUnconditionally() -> sending messages with unknown sign! : " << *toSend << endl; } } //Jacky: This function do 2 things: // 1. call simulate() to execute an event // 2. call saveState() to save a state for checkpointing //if timingsNeeded, the 1st step is counted for the time to execute an event (stopWatch in AdaptStateManager) void TimeWarp::executeSimulation() { #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); #endif if (timingsNeeded) { state->startEventTiming(); simulate(); timingsNeeded = state->stopEventTiming(); } else { simulate(); } #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::totalNumberOfEventsExecuted++; //we have just executed one event #endif //end JACKY_STATISTICS //[2006-04-02] //[2006-03-13] //Jacky Note: // For debugging purpose, let's show the values of "timeAtLastCall", "statePeriod" & "periodCounter" // in InfreqStateManager #if defined(INFREQSTATEMANAGER) && defined(JACKY_DEBUG) jacky_os << endl << "TimeWarp::executeSimulation() -> before saving state:" << endl << flush; jacky_os << "\tInfreqStateManager: timeAtLastCall = " << (state->getTimeAtLastCall()).asString() << " / statePeriod = " << state->getStatePeriod() << " / periodCounter = " << state->getPeriodCounter() << endl << endl << flush; #endif //end INFREQSTATEMANAGER //end [2006-03-13] #ifndef JACKY_RB_EXCEPTION // original version ====================================== saveState(); //Jacky: this will save the current state into the stateQ #else // Nov. 4, 2005 modified by jacky --------------------------------------------- //[2006-04-04] //Jacky Note: If JACKY_SINGLE_MACHINE is defined, we are running simulation on a single machine, so NO state //needs to be saved. #ifndef JACKY_SINGLE_MACHINE //[2006-04-04] //JACKY_SINGLE_MACHINE is not defined, we need to save state according to the 2-level state saving //control mechanism. if( skipStateSaving == true ){ //if the NC set this flag to TRUE, we will skip state saving here! //reset the flag to FALSE & no state saving here! skipStateSaving = false; #ifdef JACKY_DEBUG jacky_os << endl << "((( skipStateSaving = true -> no saveState()!! )))" << endl << flush; //jacky_os << "(((((((( current stateQ is printed as follows: ))))))))" << endl << flush; //state->printQ(jacky_os); //jacky_os << "(((((((( the current state is: ))))))))" << endl << flush; //state->current->showStateContent(jacky_os); jacky_os << "(((((((( END ))))))))" << endl << flush; #endif #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::totalNumberOfStatesSkipped++; #endif //end JACKY_STATISTICS //[2006-04-02] } else { //normal situation, always do state saving saveState(); } #endif //end JACKY_SINGLE_MACHINE [2006-04-04] #endif //end JACKY_RB_EXCEPTION ======================================================== } void TimeWarp::simulate(){ #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::jackyWatch.start(); #endif //end JACKY_STATISTICS //[2006-04-02] #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); #endif VTime preExecute = getLVT(); int i; // inFile stuff--save the position in file before execution. // We need to look into the inputQ to determine what time we are about to execute at, // and the inFileQ to determine where we are before execution. for (i = 0; i < numInFiles; i++) { inFileQ[i].storePos(inputQ.get()->recvTime, inFileQ[i].access().tellg()); } //Jacky: set inputPos in the current state to the event we are just about to execute state->current->inputPos = inputQ.getCurrent(); //Jacky: this is defined in ParallelProcessor to receive various types of msgs, receive() functions called executeProcess(); #ifdef LPDEBUG *lpFile << name << ": Finished running timestamp " << getLVT() << " "; state->current->print(*lpFile); *lpFile << "\n"; #endif if (preExecute != getLVT() ) { iRanLastTime = true; } #ifdef LAZYCANCELLATION //------------------------------------------------------------------------------------------ // Flush the lazyCancelQ in case we have messages that are never regenerated //Jacky Note: //After executing each event, we need to flush all messages on the lazyCancelQ that have a sendTime //less than the recvTime of the event we have just executed, which is the lVT of the current state, //since the event we have just executed can only send out messages with sendTime = its recvTime. if((inputQ.testCurrentObj(this->localId) == false) && (lazyCancelQ.size() != 0)) { #ifdef JACKY_DEBUG jacky_os << endl << "[LAZY]TW[" << this->id << "]::simulate() -> flush lazyQ ... " << endl << flush; jacky_os << "currentState->lVT = " << (state->current->lVT).asString() << endl << flush; jacky_os << "----------------- Current lazyQ ---------------------------" << endl << flush; lazyCancelQ.printOutputQ( jacky_os ); jacky_os << "----------------- lazyQ print end -------------------------" << endl << flush; #endif // send out negative messages BasicEvent *cancelEvent, *negEvent; cancelEvent = lazyCancelQ.seek(0, START); #ifdef LPDEBUG if((cancelEvent) && (cancelEvent->sendTime < rollbackTime)){ *lpFile << name << ":found : " << *cancelEvent << "\n" << name << ":rollbackTime : " << rollbackTime << endl; } #endif while ((cancelEvent != NULL) && (cancelEvent->sendTime < state->current->lVT)) { // the outputQ has the actual data that the pointers in the other simuObjs' // InputQ points to. To send out a negative message, we need to allocate a new one. int destination = cancelEvent->dest; negEvent = (BasicEvent *) new char[cancelEvent->size]; new(negEvent) BasicEvent(cancelEvent); negEvent->sign = NEGATIVE; negEvent->size = sizeof(BasicEvent); #ifdef DC_DEBUG dcFile << "[3]Sending antimessage : " << *cancelEvent << endl; #endif #ifdef JACKY_DEBUG jacky_os << "TW[" << this->id << "] send [-] : " << endl << "\t" << (*negEvent) << endl << flush; #endif // remove the + event's container from the outputQ, and delete both // + and - messages--they've been handled fine on the receiving end, // as far as we know. lazyCancelQ.removeCurrent(); #ifdef JACKY_DEBUG jacky_os << "TW[" << this->id << "] after removal, lazyQ : " << endl << flush; lazyCancelQ.printOutputQ( jacky_os ); #endif sendEventUnconditionally(negEvent); if(commHandle[destination].ptr->isCommManager()){ delete [] ((char *) negEvent); delete [] ((char *) cancelEvent); } // remove moves the current pointer forward one spot cancelEvent = lazyCancelQ.get(); } //end while } //end if #ifdef JACKY_DEBUG else { jacky_os << endl << "[LAZY]TW[" << this->id << "]::simulate() -> need NOT flush lazyQ ... " << endl << flush; jacky_os << "----------------- Current lazyQ ---------------------------" << endl << flush; lazyCancelQ.printOutputQ( jacky_os ); jacky_os << "----------------- lazyQ print end -------------------------" << endl << flush; } #endif #endif // end of LAZYCANCELLATION ----------------------------------------------------------------------------------- #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::jackyWatch.stop(); BasicTimeWarp::totalTimeForEventExecution += BasicTimeWarp::jackyWatch.elapsed(); #endif //end JACKY_STATISTICS //[2006-04-02] } //Jacky: this function set outputPos, myLastOutput in the current state and then call state manager's saveState() void TimeWarp::saveState() { // Note: the outputPos can get 3 possible values // (i) No outputs were generated. Hence NULL // (ii) The pointer to the last event generated in this execution cycle // (iii) Last event generated in the previous state, since no event was generted in this execution cycle. state->current->outputPos = outputQ.getTail(); //Container< BasicState >* #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "\t****** TimeWarp[" << this->id <<"]::saveState() => set state->current->outputPos = " << outputQ.getTail() << flush; if(state->current->outputPos != NULL) { jacky_os << " [" << state->current->outputPos->object << "]" << flush; } else { jacky_os << " [NULL]" << flush; } jacky_os << endl; jacky_os << "\t set state->current->myLastOutput = true" << endl << flush; #endif state->current->myLastOutput = true; state->saveState(); //Jacky: this will copy the content of the current state & save the copy into the stateQ } //[2006-03-20] GVTManager::gcollect() Step 5 //GC for the inputQ ************************************************************************************ int TimeWarp::inputGcollect(VTime gtime, BasicEvent* inputQptr) { register int collected; #ifdef OBJECTDEBUG *lpFile << name << ":input gVT: " << gtime << endl; #endif for(register int i = 0; (i < numInFiles); i++) { inFileQ[i].gcollect(gtime); } #if defined(INFREQSTATEMANAGER) || defined(LINSTATEMANAGER) || defined(NASHSTATEMANAGER) || defined(COSTFUNCSTATEMANAGER) //for infrequent & adaptive state managers collected = inputQ.gcollect(gtime, localId); #else //for normal state manager collected = inputQ.gcollect(gtime, localId, inputQptr); #endif state->committedEvents(collected, rollbackCount); return collected; } //[2006-03-20] GVTManager::gcollect() Step 1 //GC for the stateQ ************************************************************************************** VTime TimeWarp::stateGcollect(VTime gtime, BasicEvent*& inputQptr, Container*& outputQptr) { #ifdef OBJECTDEBUG *lpFile << name << ":state gcollecting for gVT: " << gtime << endl; #endif #ifdef STATS if (gtime != PINFINITY) { lpHandle->simObjArray[localId].setStateQSize(state->queueSize(), state->timeDiff()); } #endif VTime tmpTime = state->gcollect(gtime, inputQptr, outputQptr); return(tmpTime); } //[2006-03-20] GVTManager::gcollect() Step 2 //GC for the stateQ ************************************************************************************** void TimeWarp::stateClear() { state->clear(); } void TimeWarp::clearInitState() { // Set the correct values in "current" state... state->current->outputPos = outputQ.getTail(); state->current->myLastOutput = true; } //[2006-03-20] GVTManager::gcollect() Step 3 //GC for the outputQ ************************************************************************************ void TimeWarp::outputGcollect(VTime gtime, Container*& outputQptr) { for(register int i = 0; (i < numOutFiles); i++) { outFileQ[i].gcollect(gtime); } #ifdef STATEDEBUG outputQ.gcollect(outputQptr, commHandle, state->getHeadMyLastOutput(), (StateManager*)&state); #else //if STATEDEBUG is not defined #if defined(INFREQSTATEMANAGER) || defined(LINSTATEMANAGER) || defined(NASHSTATEMANAGER) || defined(COSTFUNCSTATEMANAGER) //for infrequent & adaptive state managers outputQ.gcollect(gtime, commHandle); #else //for normal state manager outputQ.gcollect(gtime, outputQptr, commHandle, state->getHeadMyLastOutput()); #endif #endif //STATEDEBUG } // *************************************************************************** // Function: void rollback(VTime rollbackTime) // *************************************************************************** void TimeWarp::rollback(VTime rollbackTime) { #ifdef JACKY_DEBUG //$$$1 ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "\t===$$$1=== TimeWarp[" << this->id <<"]::rollback() BEGIN -> rollbackTime = " << rollbackTime.asString() << endl << flush; #endif // cerr << "TimeWarp[" << id << "]::RB " << (state->current->lVT).asString() // << " -> " << rollbackTime.asString() << endl; #ifdef STATS // collect stats lpHandle->simObjArray[localId].incrementRollbackCount(); lpHandle->simObjArray[localId].setRollbackDistanceTime((state->current->lVT - rollbackTime)); lpHandle->simObjArray[localId].setRollbackDistanceStates(state->statesRolledBack); if (lastMessageSign == POSITIVE) { lpHandle->simObjArray[localId].incrementNumberOfPosStragglers(); } #endif #ifdef LPDEBUG *lpFile << name << ":rollback : " << rollbackTime << " from " << state->current->lVT << endl; #endif // restore to the correct state VTime timeRestored = state->restoreState(rollbackTime); #ifdef JACKY_DEBUG //$$$2 jacky_os << "\t===$$$2=== TimeWarp[" << this->id <<"]::rollback() -> after state::restoreState(" << rollbackTime.asString() << ") called, return timeRestored = " << timeRestored.asString() << endl << flush; #endif #ifdef LPDEBUG *lpFile << name << ":stateRestored : " << *state->current << endl; state->current->print(*lpFile); *lpFile << endl; #endif if(timeRestored == PINFINITY) { cerr << "ERROR: " << name << " can't restore state during rollback to " << rollbackTime << " from LVT " << state->current->lVT << " (timeRestored == PINFINITY)" << endl; exit(-11); } //[2006-03-15] //Jacky Note: resetting state->current->lVT = timeRestored should be called before coastForward()! //coastForward() may advance current->lVT to some time greater than timeRestored! //e.g. timeRestored (the time when the last state was saved before the rollbackTime) is 200 //statePeriod = 3, thus the next state will be saved at time 600, now rollbackTime = 500 //thus we should rollback to the end of time 400, and the coast forward is from 200 to the end of 400 //After coast forward, current->lVT should be 400. We must NOT reset it to 200! // we need to set the lVT state->current->lVT = timeRestored; #ifdef JACKY_DEBUG //$$$3 jacky_os << "\t===$$$3=== TimeWarp[" << this->id <<"]::rollback() -> after set state->current->lVT = " << timeRestored.asString() << endl << flush; if(state->getStatePeriod() != -1) { jacky_os << endl << "[Before CoastForward] -------------- check current fileQ -----------------------" << endl << flush; for (int i = 0; i < numOutFiles; i++) { jacky_os << "\t\t ===== outFileQ[" << i << "] -> " << outFileQ[i].getOutFileName() << endl << flush; outFileQ[i].printFileQ(jacky_os); } jacky_os << endl << flush; jacky_os << "[Before CoastForward] ---------- check current local variables ---------------" << endl << flush; showLocallyDefinedVariables( jacky_os ); } #endif //---------------------------------------- Coast Forward Operations --------------------------------------------------- //Jacky: if the "statePeriod" defined in the state manager is not -1 (it's -1 in normal state manager), // then infrequent or adaptive state manager is used, we need to do coastForward()! if (state->getStatePeriod() != -1) { #ifdef JACKY_DEBUG jacky_os << endl << "==[statePeriod = " << state->getStatePeriod() << "]== TimeWarp[" << this->id <<"]::rollback() -> do coastForward() for timeRestored = " << timeRestored.asString() << endl << flush; #endif #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::jackyWatch.start(); #endif //end JACKY_STATISTICS //[2006-04-02] coastForward(timeRestored); #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::jackyWatch.stop(); BasicTimeWarp::totalTimeForCoastForward += BasicTimeWarp::jackyWatch.elapsed(); #endif //end JACKY_STATISTICS //[2006-04-02] #ifdef JACKY_DEBUG jacky_os << endl << "<> -------------- check current fileQ -----------------------" << endl << flush; for (int i = 0; i < numOutFiles; i++) { jacky_os << "\t\t ===== outFileQ[" << i << "] -> " << outFileQ[i].getOutFileName() << endl << flush; outFileQ[i].printFileQ(jacky_os); } jacky_os << endl << flush; jacky_os << "<> --------------- check current local variables --------------------" << endl << flush; showLocallyDefinedVariables( jacky_os ); #endif } //end coasting forward //--------------------------------------------------------------------------------------------------------------------- //roll back the file queues rollbackFileQueues(rollbackTime); #ifdef JACKY_DEBUG //$$$4 jacky_os << "\t===$$$4=== TimeWarp[" << this->id <<"]::rollback() -> after rollbackFileQueues(" << rollbackTime.asString() << ") called" << endl << flush; jacky_os << "\t===$$$5=== TimeWarp[" << this->id <<"]::rollback() -> before rollbackProcessorVariables()" << endl << flush; showLocallyDefinedVariables( jacky_os ); #endif #ifdef JACKY_STATE //================================================================= rollbackProcessorVariables(rollbackTime) ; //Oct. 26, 2005 #endif //end JACKY_STATE =========================================================== #ifdef JACKY_DEBUG //$$$5 jacky_os << "\t===$$$5=== TimeWarp[" << this->id <<"]::rollback() -> after rollbackProcessorVariables()" << endl << flush; showLocallyDefinedVariables( jacky_os ); #endif // 1. default TW configuration -> CANCEL_MESSAGES is cancelMessagesAggressively() // 2. LAZYCANCELLATION -> CANCEL_MESSAGES is moveMessagesToLazyQueue() // 3. LAZYAGGR_CANCELLATION -> CANCEL_MESSAGES is cancelMessages(), which does nothing as defined in TW.hh CANCEL_MESSAGES(rollbackTime); #ifdef JACKY_DEBUG //$$$15 jacky_os << "\t===$$$15=== TimeWarp[" << this->id <<"]::rollback() -> after cancelMessagesAggressively(" << rollbackTime.asString() << ")" << endl << flush; #endif #ifdef MESSAGE_AGGREGATION #ifdef STATS logicalProcessStats::increment_got_roll_back(); #endif //end STATS getLPHandle()->flushOutputMessages(); #endif //end MESSAGE_AGGREGATION #ifdef JACKY_DEBUG //$$$16 jacky_os << "\t===$$$16=== TimeWarp[" << this->id <<"]::rollback() -> END ================================" << endl << endl << flush; #endif } // end of rollback() // *************************************************************************** // Function: void cancelMessagesAggressively(const VTime rollbackTime) // *************************************************************************** void TimeWarp::cancelMessagesAggressively(const VTime rollbackTime) { // send out negative messages BasicEvent* cancelEvent; BasicEvent* negEvent; #ifdef JACKY_DEBUG //$$$6 ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "\t===$$$6=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() BEGIN -> rollbackTime = " << rollbackTime.asString() << endl << flush; #endif // while using infrequent state saving strategies we cannot get // cancelEvent info from state->current so we search the outputQueue // with rollbackTime as search parameter. #if defined(INFREQSTATEMANAGER) || defined(LINSTATEMANAGER) || defined(NASHSTATEMANAGER) || defined(COSTFUNCSTATEMANAGER) #ifdef JACKY_DEBUG //[2006-03-15] jacky_os << endl << "==[InfreqStateManager]== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> call outputQ.find(" << rollbackTime.asString() << ", >=)" << endl << flush; #endif cancelEvent = outputQ.find(rollbackTime, GREATEREQUAL); #ifdef JACKY_DEBUG //[2006-03-15] if(cancelEvent != NULL){ jacky_os << "\t== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> find cancelEvent = " << *cancelEvent << endl << flush; } else { jacky_os << "\t== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> find cancelEvent = NULL! no more cancellation!!!" << endl << flush; //let's see the current outputQ jacky_os << "--------------Current outputQ [procId:"<< id <<"]---------------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; } jacky_os << "==[InfreqStateManager]== Set outputQ.currentPos to the cancelEvent!" << endl << endl << flush; #endif //endif JACKY_DEBUG outputQ.setCurrentToFind(); #else //normal state manager #ifdef JACKY_DEBUG //$$$7 jacky_os << "\t===$$$7=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> before findPositionFromState() " << endl << flush; #endif // Instead of doing the above search, we will try to get the outputQ position through the state. // Excluding the first state, outputPos in the state points to the last event created in the last simulation cycle. // So the event we need to undo is the one next to this one. cancelEvent = findPositionFromState(); #ifdef JACKY_DEBUG //$$$8 if(cancelEvent != NULL){ jacky_os << "\t===$$$8=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> find cancelEvent = " << *cancelEvent << endl << flush; } else { jacky_os << "\t===$$$8=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> find cancelEvent = NULL! no more cancellation!!!" << endl << flush; //let's see the current outputQ jacky_os << "--------------Current outputQ [procId:"<< id <<"]---------------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; } #endif //endif JACKY_DEBUG #endif //endif Normal StateManager // at this point cancelEvent points to the first Event sent after this time. //The currentPos of the outputQ is set to point to that event so that iteration of the outputQ can be easily done. #ifdef LPDEBUG if((cancelEvent)&&(cancelEvent->sendTime < rollbackTime)){ *lpFile << name << ":found : " << *cancelEvent << "\n" << name << ":rollbackTime : " << rollbackTime << endl; } #endif #ifdef ONE_ANTI_MESSAGE //------------------------------------------------------------------------------------------ #ifdef JACKY_DEBUG jacky_os << "\t===[One-Anti]=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> using ONE_ANTI_MESSAGE " << endl << flush; #endif //we need to iterate over ALL simuObjs in the system int iterateUpTo = getLPHandle()->getTotalNumberOfObjects(); //total number of simuObjs //Now, cancelEvent points to the 1st event to be cancelled on the outputQ while(cancelEvent != NULL){ #ifdef MESSAGE_AGGREGATION #ifndef ADAPTIVE_AGGREGATION lpHandle->incrementAgeOfAggregatedMessage(); #endif #endif //end MESSAGE_AGGREGATION // the outputQ has the actual data that the pointers in the other sim. // objects' InputQ points to. To send out a negative message, we need // to allocate a new one. int destID = cancelEvent->dest; //if alreadySentAntiMessage[destID] != NULL, then we have already sent a [-] msg to that destination //thus we should not send [-] msg to that destination any more! if (alreadySentAntiMessage[destID] != NULL) { //already send [-] msg to destID #ifdef LPDEBUG *lpFile << "OAM-Supressed negative message for : " << *cancelEvent << endl; #endif //end LPDEBUG #ifdef JACKY_DEBUG jacky_os << "\t[OutputQ Msg id = " << cancelEvent->eventId << "] TimeWarp[" << this->id <<"] -> already send [-] msg to TimeWarp[" << destID << "]!!!" << endl << flush; #endif //if the destID is on a remote machine if (commHandle[destID].ptr->isCommManager()) { //simply destroy the current message on the outputQ delete [] (char *)outputQ.removeCurrent(); #ifdef JACKY_DEBUG jacky_os << "\t\tRemote: delete Container & BasicEvent!! outputQ.size = " << outputQ.size() << endl << flush; #endif #if defined(ONE_ANTI_MESSAGE) && defined(STATS) lpHandle->simObjArray[localId].incrementRemoteEventsSupp(); #endif } else{ //if the destID is a local one //we should not destroy the BasicEvent* on the outputQ. We just remove it from the outputQ; //the really BasicEvent still exists on the inputQ! outputQ.removeCurrent(); #ifdef JACKY_DEBUG jacky_os << "\t\tLocal: delete Container ONLY!! outputQ.size = " << outputQ.size() << endl << flush; #endif #if defined(ONE_ANTI_MESSAGE) && defined(STATS) lpHandle->simObjArray[localId].incrementLocalEventsSupp(); #endif } //end processing cancelEvent } else { //we have not yet sent [-] msg to this destID #ifdef JACKY_DEBUG jacky_os << "\t===[One-Anti]=== TimeWarp[" << this->id <<"] -> NOT yet send [-] msg to TimeWarp[" << destID << "]!!!" << endl << flush; #endif //1. mark the array as "already sent" for destID alreadySentAntiMessage[destID] = cancelEvent; //2. remove cancelEvent's container from the outputQ, delete the container & //return the BasicEvent in it. outputQ.removeCurrent(); #ifdef JACKY_DEBUG jacky_os << "\t\tdelete Container & save cancelEvent in Array, outputQ.size = " << outputQ.size() << endl << flush; #endif } //end "already sent" or "not yet sent" //get the next cancelEvent from the outputQ cancelEvent = outputQ.get(); } // end while //after the while loop, the "alreadySentAntiMessage" contains all BasicEvent* pointers that we need to //send as [-] messages. The array may look like this: //[NULL, BasicEvent*, NULL, NULL, NULL, BasicEvent*, BasicEvent*, BasicEvent*, BasicEvent*] //since we move forward along the outputQ, the BasicEvent* recorded in this array is always the one that //has the smallest sendTime among all messages with destID as their destination! #ifdef JACKY_DEBUG jacky_os << endl << "\t===[One-Anti]=== TimeWarp[" << this->id <<"] -> alreadySentAntiMessage[] ==>" << endl << flush; for( int arr_index = 0; arr_index < iterateUpTo; arr_index++ ) { if(alreadySentAntiMessage[arr_index] != NULL){ jacky_os << "" << endl << flush; } } jacky_os << endl << flush; #endif //iterate over all elements of the "alreadySentAntiMessage" array for(int index2 = 0; (index2 < iterateUpTo); index2++){ //if the element is not NULL, send [-] msg based on the element if(alreadySentAntiMessage[index2] != NULL){ cancelEvent = alreadySentAntiMessage[index2]; int destination = cancelEvent->dest; negEvent = (BasicEvent*) new char[sizeof(BasicEvent)]; new(negEvent) BasicEvent((const BasicEvent *) cancelEvent); negEvent->sign = NEGATIVE; negEvent->size = sizeof(BasicEvent); #ifdef JACKY_DEBUG jacky_os << "\t===[One-Anti]=== TimeWarp[" << this->id <<"] -> negEvent created!!! " << endl << (*negEvent) << endl << flush; #endif sendEventUnconditionally(negEvent); #ifdef STATS lpHandle->lpStats.incrementTotalEventsUndone();; lpHandle->simObjArray[localId].incrementEventsUndone(); #endif //for remote [-] messages, destroy the [-] msg! if(commHandle[destination].ptr->isCommManager()){ delete [] ((char *)negEvent); // There will be only one copy of negEvent, but cancelEvent // could exist in another processes inputQ on this LP. delete [] (char*)cancelEvent; } //reset the element of the array to NULL alreadySentAntiMessage[index2] = NULL; } // end sending ONE [-] msg } //end for //after the for loop, all anti-messages have been sent, only one for each destination! #ifdef LPDEBUG //this test will induce FMRs if the pts in the state are incorrect. stateCheck(); #endif #else // We are not using the ONE_ANTI_MESSAGE STUFF --------------------------------------------------------------- #ifdef JACKY_DEBUG //$$$9 jacky_os << "\t===$$$9=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> not using ONE_ANTI_MESSAGE " << endl << flush; #endif while (cancelEvent != NULL) { #ifdef MESSAGE_AGGREGATION #ifndef ADAPTIVE_AGGREGATION lpHandle->incrementAgeOfAggregatedMessage(); #endif #endif //end MESSAGE_AGGREGATION // the outputQ has the actual data that the pointers in the other simuObjs' InputQ points to. //To send out a negative message, we need to allocate a new one. int destID = cancelEvent->dest; negEvent = (BasicEvent*) new char[cancelEvent->size]; #ifndef JACKY_REVISION new(negEvent) BasicEvent(cancelEvent); #else //Jacky's version new(negEvent) BasicEvent((const BasicEvent *) cancelEvent); #endif negEvent->sign = NEGATIVE; negEvent->size = cancelEvent->size; #ifdef JACKY_DEBUG //$$$10 jacky_os << "\t===$$$10=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> negEvent created!!! " << endl << (*negEvent) << endl << flush; #endif #ifdef STATEDEBUG *lpFile2 << "sending " << endl; *lpFile2 << *negEvent << "Id " << id << endl; // sanity check - compares state outputPos with neg events state->checkEvent(outputQ.getCurrent(), 1); #endif // remove the [+] event's container from the outputQ, and delete both [+] and [-] messages // they've been handled fine on the receiving end, as far as we know. outputQ.removeCurrent(); #ifdef JACKY_DEBUG //$$$11 jacky_os << "\t===$$$11=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> after outputQ.removeCurrent() " << endl << flush; jacky_os << "--------------printing outputQ [procId:"<< id <<"]---------------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; #endif sendEventUnconditionally(negEvent); #ifdef JACKY_DEBUG //$$$12 jacky_os << "\t===$$$12=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> after sendEventUnconditionally(negEvent)" << endl << flush; #endif #ifdef STATS lpHandle->lpStats.incrementTotalEventsUndone();; lpHandle->simObjArray[localId].incrementEventsUndone(); #endif if(commHandle[destID].ptr->isCommManager()){ delete [] ((char *)negEvent); delete [] (char*)cancelEvent; } // move the current pointer forward one spot cancelEvent = outputQ.get(); #ifdef JACKY_DEBUG //$$$13 if(cancelEvent != NULL){ jacky_os << "\t===$$$13=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> get next cancelEvent = " << *cancelEvent << endl << flush; } else { jacky_os << "\t===$$$13=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() -> No more cancelEvent ----! " << endl << flush; } #endif } //end while (cancelEvent != NULL) #endif // end of ONE_ANTI_MESSAGE --------------------------------------------------------------------------------- #ifdef JACKY_DEBUG //$$$14 jacky_os << "\t===$$$14=== TimeWarp[" << this->id <<"]::cancelMessagesAggressively() END" << endl << flush; #endif } //Jacky: this function is used to find the 1st cancelEvent when normal state manager is used BasicEvent* TimeWarp::findPositionFromState() { // Instead of doing a search, we will try to get the outputQ position through the state. //Excluding the first state, outputPos in the state points to the last event created in the last simulation cycle. // So the event we need to undo is the one next to this one. //Jacky: I don't understand this ??? // Note: Special Case : If the first state is the initial state (namely state saved at time ZERO or ZERO(1), // outputPos pointer points to last event created. Otherwise, if the first state is the first one after the // GVT update (not ZERO, the state right before gvt), outputPos pointer points to the next event or to the // first output event of the following cycle. Exception in this case: outputPos points to NULL, outputPos is // the first event in outputQ #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "\t\t=======a>====== TimeWarp[" << id << "]::findPositionFromState() BEGIN" << endl << flush; #endif BasicEvent *cancelEvent = NULL; if (state->current->outputPos == NULL) { if (outputQ.getHead() != NULL #ifdef JACKY_RB_ZERO // Nov. 7, 2005**************************************************************** //Note: we should not cancel the initial messages sending from the Root to all NCs! // i.e. messages on the outputQ of the ParallelRoot (id = 0) should not be sent // as anti-messages during rollbacks to "before zero" && ( id != 0 ) //not the ParallelRoot #endif // ****************************************************************************************** ) { #ifdef JACKY_DEBUG jacky_os << "\t\t=======b>====== TimeWarp[" << id << "]::findPositionFromState() -> state->current->outputPos == NULL" << " && outputQ.getHead() != NULL" << endl << flush; //cerr << ">>> Note: currentState->outputPos = NULL && outputQ.getHead() != NULL <<< " << endl; //cerr << ">>> cancelEvent is the head of the outputQ! <<<" << endl; jacky_os << "--------printing outputQ [procId:"<< id <<"]--------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "--------------- outputQ printed --------------------------" << endl << flush; #endif cancelEvent = outputQ.getHead()->object; #ifdef JACKY_DEBUG jacky_os << "\t\t=======c>====== TimeWarp[" << id << "]::findPositionFromState() -> cancelEvent = " << (*cancelEvent) << endl << flush; #endif outputQ.setCurrent(outputQ.getHead()); #ifdef JACKY_DEBUG jacky_os << "\t\t=======d>====== TimeWarp[" << id << "]::findPositionFromState() -> after outputQ.setCurrent( outputQ.getHead() ) " << endl << flush; jacky_os << "\t\t\t outputQ::currentPos = " << outputQ.getHead() << endl << flush; #endif } //end if outputQ.getHead() != NULL } else { //Jacky: state->current->outputPos != NULL if (state->current->outputPos->next != NULL) { #ifdef JACKY_DEBUG jacky_os << "\t\t=======e>====== TimeWarp[" << id << "]::findPositionFromState() -> " << endl << flush ; jacky_os << "\t\t\t state->current->outputPos = " << state->current->outputPos << " != NULL" << endl << flush; jacky_os << "\t\t&&\t state->current->outputPos->next = " << state->current->outputPos->next << "!= NULL" << endl << flush; jacky_os << "----------printing outputQ [procId:"<< id <<"]--------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; #endif cancelEvent = state->current->outputPos->next->object; #ifdef JACKY_DEBUG jacky_os << "\t\t=======f>====== TimeWarp["<< id <<"]::findPositionFromState() -> cancelEvent = " << (*cancelEvent) << endl << flush; #endif outputQ.setCurrent(state->current->outputPos->next); #ifdef JACKY_DEBUG jacky_os << "\t\t=======g>====== TimeWarp["<< id << "]::findPositionFromState() -> " << "after outputQ.setCurrent(state->current->outputPos->next) " << endl << flush; jacky_os << "\t\t\t outputQ::currentPos = " << state->current->outputPos->next << endl << flush; #endif } //end state->current->outputPos->next != NULL, i.e. there are events to be cancelled } //end state->current->outputPos != NULL #ifdef JACKY_DEBUG if(cancelEvent != NULL){ jacky_os << "\t\t=======h>====== TimeWarp["<< id << "]::findPositionFromState() -> return cancelEvent = " << (*cancelEvent) << endl << flush; } else { jacky_os << "\t\t=======h>====== TimeWarp["<< id << "]::findPositionFromState() -> return cancelEvent = NULL, nothing to cancel!!!" << endl << flush; } #endif return cancelEvent; } #if defined(LAZYCANCELLATION) || defined(LAZYAGGR_CANCELLATION) //Jacky Note: //this function is called as "CANCEL_MESSAGES(rollbackTime)" in TW::rollback(rollbackTime) //By default, we use Aggressive-Cancellation, and "CANCEL_MESSAGES(rollbackTime)" is mapped //to TW::cancelMessagesAggressively(); //If LAZYCANCELLATION is defined, "CANCEL_MESSAGES(rollbackTime)" is mapped to this function! void TimeWarp::moveMessagesToLazyQueue(const VTime rollbackTime) { // send out negative messages BasicEvent *cancelEvent; #ifdef JACKY_LAZY_CANCELLATION //[2006-04-14] //Jacky Note: //we nned a negEvent for aggressive cancellation on intra-LP messages BasicEvent* negEvent; #endif //end JACKY_LAZY_CANCELLATION //[2006-04-14] if(state->getStatePeriod() != -1){ //Jacky Note: this is for InfreqStateManager cancelEvent = outputQ.find(rollbackTime, GREATEREQUAL); } else{ //Jacky Note: this is for StateManager cancelEvent = findPositionFromState(); } #ifdef DC_DEBUG if((cancelEvent)&&(cancelEvent->sendTime < rollbackTime)){ dcFile << name << ":found : " << *cancelEvent << "\n" << "rollbackTime : " << rollbackTime << endl; } #endif #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "[LAZY] TW[" << this->id << "]::moveMessagesToLazyQueue(" << rollbackTime.asString() << ") called!" << endl << flush; if(cancelEvent != NULL){ jacky_os << "find cancelEvent = " << *cancelEvent << endl << flush; } else { jacky_os << "find cancelEvent = NULL! no more cancellation!!!" << endl << flush; //let's see the current outputQ jacky_os << "--------------Current outputQ [procId:"<< id <<"]---------------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; } #endif //end JACKY_DEBUG if (cancelEvent != NULL) { #ifndef JACKY_LAZY_CANCELLATION //[2006-04-14] *************************** original version ************************** //Jacky: bit operation, suppressMessage = suppressMessage | LAZYCANCEL(0001) // Thus, the last bit of suppressMessage is set to 1 #ifdef JACKY_DEBUG jacky_os << endl << "[LAZY] current suppressMessage = " << suppressMessage << flush; #endif suppressMessage |= LAZYCANCEL; #ifdef JACKY_DEBUG jacky_os << "=> turn on lazy-cancel: suppressMessage = " << suppressMessage << endl << flush; jacky_os << "TW[" << this->id << "] -> Insert events into lazyCancelQ!" << endl << flush; jacky_os << "Before insertion, lazyQ ---------------------------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyQ printed -------------------------------"<< endl << flush; #endif #ifdef DC_DEBUG dcFile << name << ":Suppression Flag = " << suppressMessage << endl << name << ":Inserting events into Lazy Cancellation Queue" << endl; #endif //Jacky: this while loop will insert all [+] cancelEvent onto the lazyQ // rather than sending them out as [-] msg while (cancelEvent != NULL) { #ifdef DC_DEBUG dcFile << *cancelEvent << endl; #endif //insert the cancelEvent onto the lazyQ lazyCancelQ.insert(cancelEvent); // remove from the outputQueue outputQ.removeCurrent(); // move the current pointer forward one spot cancelEvent = outputQ.get(); } //end while #else //[2006-04-14] **************************************** new version ******************************************* //Jacky Note [2006-04-18]: //In this new version, we only move those inter-LP messages from the outputQ //to the lazyCancelQ. For intra-LP messages, we simply send them out as [-] //messages so that normal rollback can happen on the local node. //1. inter-LP message -> moved to lazyCancelQ //2. intra-LP message -> cancelled aggressively (aggressive cancellation) //a flag used to turn on LAZYCANCEL once bool lazyCancelSwitch = false; //loop over all cancelEvents while (cancelEvent != NULL) { int destID = cancelEvent->dest; if( commHandle[destID].ptr->isCommManager() ){ //1. inter-LP message if( lazyCancelSwitch ==false ){ //Jacky: bit operation, suppressMessage = suppressMessage | LAZYCANCEL(0001) // Thus, the last bit of suppressMessage is set to 1 #ifdef JACKY_DEBUG jacky_os << endl << "[LAZY ON] current suppressMessage = " << suppressMessage << flush; #endif //when LAZYCANCEL is turned on, we will invoke lazyCancel() //in function suppressLazyMessages(), which is called by sendEvent() suppressMessage |= LAZYCANCEL; lazyCancelSwitch = true; #ifdef JACKY_DEBUG jacky_os << "=> turn on lazy-cancel: suppressMessage = " << suppressMessage << endl << flush; #endif } //end turn on LAZYCANCEL #ifdef JACKY_DEBUG jacky_os << "TW[" << this->id << "] -> Insert events into lazyCancelQ!" << endl << flush; jacky_os << "Before insertion, lazyQ ---------------------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyQ printed -------------------------"<< endl << flush; #endif //move inter-LP messages to the lazyCancelQ lazyCancelQ.insert(cancelEvent); outputQ.removeCurrent(); cancelEvent = outputQ.get(); } else { //2. intra-LP message //do aggressive cancellation negEvent = (BasicEvent*) new char[cancelEvent->size]; new(negEvent) BasicEvent((const BasicEvent *) cancelEvent); negEvent->sign = NEGATIVE; negEvent->size = cancelEvent->size; #ifdef JACKY_DEBUG jacky_os << "\tTimeWarp[" << this->id <<"] -> [-] created {" << (*negEvent) << "}" << endl << flush; #endif outputQ.removeCurrent(); //also advance the pointer to the next cancelEvent #ifdef JACKY_DEBUG jacky_os << "\tTimeWarp[" << this->id <<"] -> after outputQ.removeCurrent()" << endl << flush; jacky_os << "-------------- outputQ [procId:"<< id <<"] ---------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "----------------- outputQ printed ------------------------" << endl << flush; #endif sendEventUnconditionally(negEvent); //send out [-] msg #ifdef JACKY_DEBUG jacky_os << "\tTimeWarp[" << this->id <<"]-> after sendEventUnconditionally(negEvent)" << endl << flush; #endif cancelEvent = outputQ.get(); #ifdef JACKY_DEBUG if(cancelEvent != NULL){ jacky_os << "\tTimeWarp[" << this->id <<"]-> next cancelEvent = {" << (*cancelEvent) << "}" << endl << flush; } else { jacky_os << "\tTimeWarp[" << this->id <<"] -> No more cancelEvent!" << endl << flush; } #endif } //end if intra-LP message } //end while (cancelEvent != NULL) #endif //end JACKY_LAZY_CANCELLATION //[2006-04-14] ----------- end new version ------------- #ifdef JACKY_DEBUG jacky_os << "--------------Current outputQ [procId:"<< id <<"]---------------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; jacky_os << endl << "After insertion, lazyQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyQ printed --------------------------" << endl << flush; #endif } //end if cancelEvent != NULL #ifdef JACKY_DEBUG else { //if cancelEvent = NULL //let's see the current lazyCancelQ jacky_os << "--------------Current lazyCancelQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed --------------------------" << endl << flush; } #endif //end JACKY_DEBUG // Reset the lazy cancellation queue to the beginning lazyCancelQ.seek(0,START); } #endif // end of LAZYCANCELLATION or LAZYAGGR_CANCELLATION // *************************************************************************** // Function: void recvEvent(BasicEvent *newEvent) // *************************************************************************** void TimeWarp::recvEvent(BasicEvent *newEvent) { #ifdef CHECKASSERTIONS if (state->getHead() != NULL) { //assert(newEvent->recvTime > state->getHead()->lVT); if (newEvent->recvTime <= state->getHead()->lVT) { cerr << "newEvent: " << *newEvent << "\n" << "Head of queue: " << *(state->getHead()) << "\n" << "Tail of queue: " << *(state->getTail()) << "\n" << "gVT = " << gVTHandle->getGVT() << "\n"; *lpFile << "newEvent: " << *newEvent << "\n" << "Head of queue: " << *(state->getHead()) << "\n" << "Tail of queue: " << *(state->getTail()) << "\n" << "gVT = " << gVTHandle->getGVT() << endl; if (gVTHandle->recoveringFromCheckpoint == true) { cout << "Recovering from checkpoint" << endl; *lpFile << "Recovering from checkpoint" << endl; } abort(); } } assert( (newEvent->sign == POSITIVE) || (newEvent->sign == NEGATIVE)); #endif #ifdef MESSAGE_AGGREGATION #ifndef ADAPTIVE_AGGREGATION (this)->lpHandle->incrementAgeOfAggregatedMessage(); #endif lpHandle->incrementAgeOfAggregatedMessage(); #endif bool inThePast; #ifdef LPDEBUG *lpFile << name << ":received : " << *newEvent << endl; #endif #ifdef MATTERNGVTMANAGER if(newEvent->dest != newEvent->sender){ if(newEvent->color == 0 ){ whiteMessages--; } } else { if(newEvent->color == 1){ redMessages--; } } #endif #ifdef STATS lastMessageSign = newEvent->sign; // collect stats if (newEvent->sign == POSITIVE) { lpHandle->simObjArray[localId].incrementPosRecv(); } else { lpHandle->simObjArray[localId].incrementNegRecv(); } #endif VTime newEventTime = newEvent->recvTime; #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "\t==================== TimeWarp["<< id << "]::recvEvent() -> event recvTime = " << newEventTime.asString() << endl << flush; #endif // insert event into input queue inThePast = inputQ.insert(newEvent,localId); #ifdef JACKY_DEBUG jacky_os << "\t==================== TimeWarp["<< id << "]::recvEvent() -> event inserted to inputQ, inThePast ? " << inThePast << endl << endl << flush; #endif if (inThePast == true){ if(newEventTime < gVTHandle->getGVT()) { cerr << "Rolling back before gVT (" << gVTHandle->getGVT() << ") due to event at time: " << newEventTime << "\n" << *newEvent << endl; cerr << "*****************************************" << endl; inputQ.printlVTArray(); cerr << "*****************************************" << endl; abort(); } //[2006-04-05] //Jacky Note: //The following approach to measuring rollback time is NOT correct!!! //Rollbacks happen as a chain of cascade operations on a node: //TW1.recvEvent() // -> inputQ.insert() // -> inThePast = true, call TW1.rollback() // ->rollback operations & Cancel_Message // -> TW1's Cancel_Message will cause other TW objects to be rolled back and the chain // can go back to TW1 as well //When the "initiator" of the chain of rollbacks returns from its rollback() function, ALL TW objects //on this node have all been rolled back!!! //As a result, we cannot measure the time of rollbacks individually for each TW object! The initiator's //rollback already include all other TW objects' rollback operations on this node!!! //We should start the watch at the beginning of the initiator's rollback(). This watch should NOT be //started again before the followers' rollback(). And we should stop the watch at the end of the original //initiator's rollback(). This watch should NOT be stopped at the end of the followers' rollback()! //Here is the logic flow: // 1. insertion on initiator's inputQ returns true (inThePast = true) // 2. watch.start() // 3. all followers' rollbacks finish // 4. initiator's rollback finish // 5. watch.stop() //Thus, the elapsed time of the watch includes the total time for this round of rollback operations //on this node! // //To do this, we need a flag shared by all TW objects indicating that the watch has been started by the //initiator. We use the initiator's procId as the flag (called "rollbackInitiatorId") and define it in //class BasicTimeWarp as a static member so that all TW objects on this node can access it. It is //initialized to -1. Before we start the rollbackWatch, we check this id: // if rollbackInitiatorId == -1 // -> we are the initiator, just start the watch and set this flag to our id; // if rollbackInitiatorId != -1 // -> we are one of the followers, don't start the watch again and leave the flag alone. //After we returned from the rollback() function, we check this flag again before we stop the watch: // if our id != rollbackInitiatorId // -> we are not the initiator, don't stop the watch and leave the flag alone; // if our id == rollbackInitiatorId // -> we are the original initiator, stop the watch, update the total rollback time variable, // and reset the flag to -1. #ifdef JACKY_STATISTICS //[2006-04-05] ----------------------------------------------------------------------- if(BasicTimeWarp::rollbackInitiatorId == -1){ //we are the rollback initiator! BasicTimeWarp::rollbackInitiatorId = id; //set our id as the initiator's id #ifdef JACKY_DEBUG jacky_os << endl << "===== TimeWarp["<< id << "]::recvEvent() -> RB Initiator / " << " set rollbackInitiatorId = " << BasicTimeWarp::rollbackInitiatorId << endl << flush; #endif BasicTimeWarp::jackyRBWatch.start(); } #endif //end JACKY_STATISTICS //[2006-04-05] ----------------------------------------------------------------------- state->startRollbackTiming(); #if defined(MESSAGE_AGGREGATION) && defined(FIXED_SLOPE_WITH_ERROR) lpHandle->setRollingBackFlagInMessageManager(); #endif // rollback now ... rollback(newEventTime); #if defined(MESSAGE_AGGREGATION) && defined(FIXED_SLOPE_WITH_ERROR) lpHandle->resetRollingBackFlagInMessageManager(); #endif state->finishRollbackTiming(); #ifdef JACKY_STATISTICS //[2006-04-02] ------------------------------------------------------------------------ if(id == BasicTimeWarp::rollbackInitiatorId) { //we are the original initiator! BasicTimeWarp::jackyRBWatch.stop(); BasicTimeWarp::rollbackInitiatorId = -1; //reset the initiator's id to -1 #ifdef JACKY_DEBUG jacky_os << endl << "===== TimeWarp["<< id << "]::recvEvent() -> RB Initiator finished! / " << " reset rollbackInitiatorId = " << BasicTimeWarp::rollbackInitiatorId << endl << flush; #endif BasicTimeWarp::totalTimeForRollback += BasicTimeWarp::jackyRBWatch.elapsed(); } #endif //end JACKY_STATISTICS //[2006-04-02] ------------------------------------------------------------------------ } //end inThePast == true } // **** end of function recvEvent ******************************************* // ************************************************************************** // Function: void calculateMin() // ************************************************************************** VTime TimeWarp::calculateMin() { VTime retval = PINFINITY; // if we have an event to execute, that is what we want to use... BasicEvent *tempEvent = inputQ.get(); if( tempEvent != NULL ){ // we need to use the event we're about to execute retval = tempEvent->recvTime; } return retval; } // **** end of function calculateMin **************************************** // ************************************************************************** // Function: void withinTimeWindow() // ************************************************************************** bool TimeWarp::withinTimeWindow() { bool withinWindow = false; // this doesn't get an event for execution - it just checks the time BasicEvent *currentEvent = inputQ.get(); if((currentEvent) && ((currentEvent->recvTime - gVTHandle->getGVT()) < timeWindow)){ withinWindow = true; } else if((getLVT() - gVTHandle->getGVT()) < timeWindow) { // this condition is for source - like objects which may or may not be event driven. withinWindow = true; } return withinWindow; } //**** end of function withinTimeWindow ************************************* //Jacky: if LAZYCANCEL or LAZYAGGRCANCEL is used, return the result from function lazyCancel(); otherwise, return false! bool TimeWarp::suppressLazyMessages(BasicEvent* toSend) { bool suppressDecision = false; #if defined(LAZYCANCELLATION) || defined(LAZYAGGR_CANCELLATION) //------------------------------------------- #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "[LAZY] TW[" << this->id << "] suppressLazyMessages() called!" << endl << flush; jacky_os << "\tcurrent suppressMessage = " << suppressMessage << endl << flush; #endif if (suppressMessage & LAZYCANCEL) { //bit AND, LAZYCANCEL = 0001 //Jacky: if suppressMessage is LAZYCANCEL(0001) or LAZYAGGRCANCEL(0011), AND operation result != 0 #ifdef JACKY_DEBUG jacky_os << "\tsuppressMessage & LAZYCANCEL = " << (suppressMessage & LAZYCANCEL) << endl << flush; #endif suppressDecision = lazyCancel(toSend); #ifdef JACKY_DEBUG jacky_os << "\tcall lazyCancel() function! return supressDecision = " << flush; if(suppressDecision == true) { jacky_os << "TRUE!" << endl << flush; } else { jacky_os << "FALSE" << endl << flush; } #endif } #endif //end LAZYCANCELLATION || LAZYAGGR_CANCELLATION -------------------------------------------------------- return suppressDecision; } //**** end of function suppressLazyMessages ********************************* //Function: coastForward(timeRestored) ========================================================================== void TimeWarp::coastForward(VTime timeRestored) { BasicEvent *nextToExecute = NULL; BasicEvent *findEvent = NULL; // This needs to be optimized, so that timings are not taken for those algorithms that do not need them. if (timingsNeeded) { stopWatch.start(); } inputQ.saveCurrent(); #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); //cerr << "CF!" << endl; jacky_os << endl << "==== [CoastForward] TimeWarp[" << id << "] BEGIN" << endl << flush; if(inputQ.getCurrent() != NULL){ jacky_os << "\tsave inputQ.currentPos = " << inputQ.getCurrent() << " {" << *(inputQ.getCurrent()) << "}" << endl << flush; } else { jacky_os << "\tsave inputQ.currentPos = NULL" << endl << flush; } #endif // Peek at next message to execute after rollback is completed for this object. nextToExecute = inputQ.getObj(localId); //this is currentObj[localId] #ifdef JACKY_DEBUG jacky_os << endl << "==== [CoastForward] set nextToExecute = currentObj[localId] = " << flush; if(nextToExecute != NULL){ jacky_os << nextToExecute << " {" << *nextToExecute << "}" << endl << flush; } else { jacky_os << "NULL" << endl << flush; } #endif #ifndef JACKY_INFREQ_STATEMANAGER //[2006-03-15] --------------------------------------------------------------------- // find first message to re-apply during coasting forawrd // the objectId (id) is needed because the compare func for LTSF also uses the dest of the objects. // LocalId is needed to get this object's miniList. //Jacky: Find the 1st event on the inputQ whose recvTime >= timeResored // This is the 1st event of the TimeSlice whithin which a state has been saved & is just restored //[2006-03-15] findEvent = inputQ.find(timeRestored, id, GREATEREQUAL, localId); #ifdef JACKY_DEBUG jacky_os << endl << "==== [CoastForward] find 1st event with recvTime >= " << timeRestored.asString() << " => findEvent = " << flush; if(findEvent != NULL){ jacky_os << findEvent << " {" << *findEvent << "}" << endl << flush; } else { jacky_os << "NULL" << endl << flush; } #endif inputQ.setCurrentObjToFindObj(localId); #ifdef JACKY_DEBUG jacky_os << endl << "==== [CoastForward] set currentObj[id] = findObj[id] = " << flush; if(inputQ.getCurrentObj(localId) != NULL){ jacky_os << inputQ.getCurrentObj(localId) << " {" << *(inputQ.getCurrentObj(localId)) << "}" << endl << flush; } else { jacky_os << "NULL" << endl << flush; } if(findEvent != NULL){ jacky_os << endl << "==== [CoastForward] Skip the event for which we have already saved state! recvTime = " << (findEvent->recvTime).asString() << " ..." << endl << flush; } int skipEventCount = 0; #endif // if we have an event at this time then we should skip it because it has already been executed. //Jacky Note [2006-03-13] //Since we are NOT necessarily save state after processing the 1st event at this time, e.g. we may have //skipped several events via "skipStateSaving" flag, here we should skip all events with recvTime = //timeRestored until stateRestored->inputPos. //i.e. stateRestored->inputPos is the LAST event we should skip here, since the state recovered is exactly //this stateRestored which is saved after processing its "inputPos" event! //[2006-03-15] //1. Skip all events in range [findEvent, currentState->inputPos] //2. Coast forward the remaining events in the TimeSlice, // i.e. in range (currentState->inputPos, findEvent->recvTime = timeRestored] //In fact, we don't need to do skipping, just coast forward in step 2 if (findEvent != NULL && timeRestored == findEvent->recvTime) { findEvent = inputQ.getAndSeekObj(id,localId); #ifdef JACKY_DEBUG skipEventCount++; #endif } #ifdef JACKY_DEBUG jacky_os << endl << "==== [CoastForward] " << skipEventCount << " events have been skipped!" << endl << flush; #endif #else //[2006-03-15] ------------------------------------------------------------------------------------------------- //Jacky Note: //In this new version, the event corresponding to the state that we have just restored is retrieved directly //from state->current->inputPos. The next event after state->current->inputPos is the 1st event for our coast //forwarding operation. findEvent = state->current->inputPos; #ifdef JACKY_DEBUG jacky_os << endl << "->->-> [CoastForward] => findEvent = currentState->inputPos = " << flush; if(findEvent != NULL){ jacky_os << findEvent << " {" << *findEvent << "}" << endl << flush; } else { jacky_os << "NULL" << endl << flush; } #endif inputQ.setCurrentObjTo(localId, findEvent); //set currentObj[localId] to this event #ifdef JACKY_DEBUG jacky_os << endl << "->->-> [CoastForward] set currentObj[id] = findEvent = " << flush; if(inputQ.getCurrentObj(localId) != NULL){ jacky_os << inputQ.getCurrentObj(localId) << " {" << *(inputQ.getCurrentObj(localId)) << "}" << endl << flush; } else { jacky_os << "NULL" << endl << flush; } #endif //now, skip the current findEvent (which is the same as current currentObj[localId]) //and get the 1st event for coast forwarding if (findEvent != NULL && timeRestored == findEvent->recvTime) { inputQ.getAndSeekObj(id,localId); //this will advance currentObj[localId] to the next event } #ifdef JACKY_DEBUG jacky_os << endl << "====> [CoastForward] Skip the event for which we have saved state!" << endl << flush; #endif #endif //end JACKY_INFREQ_STATEMANAGER [2006-03-15] ------------------------------------------------------------------ findEvent = inputQ.getObj(localId); //set findEvent to the new currentObj[localId] //Jacky Note: Now, findEvent points to the 1st event for coast forward! #ifdef JACKY_DEBUG jacky_os << "->->-> [CoastForward] now, findEvent = " << flush; if(findEvent != NULL){ jacky_os << findEvent << " {" << *findEvent << "}" << endl << flush; } else { jacky_os << "NULL" << endl << flush; } jacky_os << endl << "==== [CoastForward] current suppressMessage = " << suppressMessage << flush; #endif suppressMessage |= COASTFORWARD; // turn on message suppression #ifdef JACKY_DEBUG jacky_os << "\t => turn on msg-suppression: suppressMessage = " << suppressMessage << endl << flush; #endif #ifdef LPDEBUG *lpFile << name << ":Suppression Flag = " << suppressMessage << endl; if ((findEvent != NULL) && (nextToExecute != NULL)) { *lpFile << name <<": Before Coast : lVT ="<< state->current->lVT << " looking at event\n" << name << ":" << *findEvent << "\n" << name << ":next event after coasting should be\n" << name << ":" << *nextToExecute << "\n"; } else { if (findEvent == NULL) { *lpFile << name << ":No event found!" << "\n"; } else { *lpFile << name <<": Before Coast : lVT ="<< state->current->lVT << " looking at event\n" << name << ":" << *findEvent << "\n"; } if (nextToExecute == NULL) { *lpFile << name << ":No end event!" << "\n"; } } #endif register int counter = 0; VTime timeAtLastCall = getLVT(); #ifdef JACKY_DEBUG jacky_os << endl << "record timeAtLastCall = getLVT() = " << timeAtLastCall.asString() << endl << endl << flush; jacky_os << endl << "->->-> Before CF, let's see the current state we have restored...." << endl << flush; state->current->showStateContent( jacky_os ); jacky_os << endl << "------- call TW::recoverProcessorVariables() before coast forward! --------" << endl << flush; #endif #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] --------------------------------- //[2006-03-15] //This function call is added to recover the NCBag from NCBagRef before doing coast forward! recoverProcessorVariables(); #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] ------------------------------------------ //Jacky: coast forward from "findEvent" to the event before "nextToExecute" while ((findEvent != NULL) && (nextToExecute != findEvent)) { #ifdef LPDEBUG *lpFile << name <<": Coasting :"<< findEvent->recvTime << " Event = " << *findEvent << "\n"; state->current->print(*lpFile); *lpFile << "\n"; #endif #ifdef JACKY_DEBUG jacky_os << endl << "->->-> [CoastForward] Process findEvent = " << findEvent << " {" << *findEvent << "}" << endl << flush; #endif if (timeAtLastCall != getLVT() ) { // ignore events at the same time in our count of the forward execution. counter++; #ifdef JACKY_DEBUG jacky_os << "\ttimeAtLastCall = " << timeAtLastCall.asString() << " / getLVT() = " << getLVT().asString() << " ==> set counter = " << counter << endl << flush; #endif } findEvent->alreadyProcessed = false; #ifdef JACKY_DEBUG jacky_os << "\tset findEvent->alreadyProcessed = false! findEvent = {" << *findEvent << "}" << endl << flush; #endif // Set the input Queue to point at the message to use in coasting forward for this object. inputQ.setCurrentToCurrentObj(localId); #ifdef JACKY_DEBUG jacky_os << "\tset currentPos = currentObj[id] = " << flush; if(inputQ.getCurrent() != NULL){ jacky_os << inputQ.getCurrent() << " {" << *(inputQ.getCurrent()) << "}" << endl << flush; } else { jacky_os << "NULL" << endl << flush; } jacky_os << "\tCall executeProcess() .... " << endl << flush; #endif executeProcess(); #ifdef JACKY_DEBUG //after coast forward 1 event, let's see the content of the current state jacky_os << "\t\t-----> resulting current state is: " << endl << flush; state->current->showStateContent(jacky_os); jacky_os << "\t\t-----------------------------------" << endl << endl << flush; #endif #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::totalNumberOfEventsCoastedForward++; #endif //end JACKY_STATISTICS //[2006-04-02] findEvent = inputQ.getObj(localId); } //end while inputQ.restoreCurrent(); #ifdef JACKY_DEBUG jacky_os << endl << "==== [CoastForward] restore inputQ.currentPos = " << flush; if(inputQ.getCurrent() != NULL){ jacky_os << inputQ.getCurrent() << " {" << *(inputQ.getCurrent()) << "}" << endl << flush; } else { jacky_os << "NULL" << endl << flush; } jacky_os << endl << "==== [CoastForward] current suppressMessage = " << suppressMessage << flush; #endif suppressMessage &= ~COASTFORWARD; // turn message suppression off #ifdef JACKY_DEBUG jacky_os << "\t => turn off msg-suppression: suppressMessage = " << suppressMessage << endl << flush; jacky_os << endl << "->->->->->->->->->->->->-> CoastForward End! ->->->->->->->->->->->->->->" << endl << flush; #endif //this function does nothing for normal StateManager & InfreqStateManager state->coastedForwardEvents( counter ); if (timingsNeeded) { stopWatch.stop(); state->coastForwardTiming( stopWatch.elapsed() ); } #ifdef LPDEBUG *lpFile << name <<": finished coasting next event for this object is "; if (findEvent == NULL) { *lpFile << "NULL" << endl; } else { *lpFile << *findEvent << endl; } inputQ.printSmallQ(*lpFile,localId); #endif } //end of function coastForward =================================================================================== #if defined(LAZYCANCELLATION) || defined(LAZYAGGR_CANCELLATION) #ifndef JACKY_LAZY_CANCELLATION //[2006-04-14] ************************* original version ************************* //Jacky: this function decides whether sending an event or not when lazy_cancellation or //lazy_aggr_cancellation is used. It actually sends out [-] message if necessary. bool TimeWarp::lazyCancel(BasicEvent* toSend) { bool suppressDecision = false; #ifdef DC_DEBUG int flag = 0; dcFile << name << ":Begin Lazy Cancel Phase" << endl; #endif #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "[LAZY] TW[" << this->id << "] lazyCancel(toSend) called!" << endl << flush; jacky_os << "Begin Lazy Cancel Phase ........................................... " << endl << flush; jacky_os << endl << flush; //print current lazyCancelQ jacky_os << endl << "--------------Current lazyCancelQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed --------------------------" << endl << flush; #endif BasicEvent *lcOutputEvent = NULL, *negEvent = NULL; lcOutputEvent = lazyCancelQ.get(); #ifdef JACKY_DEBUG jacky_os << "current lazyCancelQ size = " << lazyCancelQ.size() << endl << endl << flush; jacky_os << "Get the 1st event on lazyQ, lcOutputEvent = " << endl << flush; if(lcOutputEvent != NULL){ jacky_os << "[" << lcOutputEvent << "] -> " << *lcOutputEvent << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } #endif // Cancel any messages between the last message processed in the outputQ & the current message to send. while ((lcOutputEvent != NULL) && (lcOutputEvent->sendTime < state->current->lVT)) { int destID = lcOutputEvent->dest; negEvent = (BasicEvent *) new char[lcOutputEvent->size]; new(negEvent) BasicEvent(lcOutputEvent); negEvent->sign = NEGATIVE; negEvent->size = sizeof(BasicEvent); #ifdef DC_DEBUG dcFile << "Compared with : " << *toSend << endl; dcFile << "[1]Sending antimessage : " << *lcOutputEvent << endl; flag++; #endif lazyCancelQ.removeCurrent(); #ifdef JACKY_DEBUG jacky_os << "[LAZY] send [-] event -> " << endl << "\t" << (*negEvent) << endl << flush; #endif sendEventUnconditionally(negEvent); if(commHandle[destID].ptr->isCommManager()){ delete [] ((char *)negEvent); delete [] (char*)lcOutputEvent; } lcOutputEvent = lazyCancelQ.get(); } //end while bool Hit = false; #ifdef JACKY_DEBUG jacky_os << endl << "[LAZY] now CancelQ size = " << lazyCancelQ.size() << endl << flush; jacky_os << "\tlcOutputEvent = " << flush; if(lcOutputEvent != NULL){ jacky_os << "[" << lcOutputEvent << "] -> " << *lcOutputEvent << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } if(toSend != NULL){ jacky_os << "\t[LAZY] toSend = {" << *toSend << "}" << endl << flush; } else { jacky_os << "\t[LAZY Strange!] toSend = {NULL}" << endl << flush; } #endif // Check messages at the current time for lazy cancellation. //Jacky Note: //loop over events that would have been cancelled if aggressive cancellation is used //only check those events with the SAME send time as the newly generated event (toSend) while ((lcOutputEvent != NULL) && (toSend->sendTime == lcOutputEvent->sendTime) && (Hit == false)) { #ifdef DC_DEBUG if (lcOutputEvent == NULL) { dcFile << name << ":lcOutputEvent = NULL" << endl; } else { dcFile << name << ":Comparing messages to send:\n"; dcFile << "comparing " << endl << *lcOutputEvent << " and " << endl << *toSend << endl; } #endif #ifdef JACKY_DEBUG jacky_os << "[LAZY Compare] lcOutputEvent = {" << *lcOutputEvent << "}" << endl << flush; #endif // This is a little wierd so lets be detailed. //We want to only compare the users' stuff and the destination, so in essence we want to ignore the eventId. SequenceCounter oldLcOutputEvent; oldLcOutputEvent = lcOutputEvent->eventId; lcOutputEvent->eventId = toSend->eventId; bool tempFlag = lcOutputEvent->alreadyProcessed; lcOutputEvent->alreadyProcessed = false; #ifdef JACKY_DEBUG jacky_os << "\toldLcEventId = " << oldLcOutputEvent << " / set lcOutputEvent->eventId = " << toSend->eventId << endl << flush; jacky_os << "\toldLcEvent alreadyProcessed = " << flush; if( tempFlag == true ){ jacky_os << "TRUE" << flush; } else { jacky_os << "FALSE" << flush; } jacky_os << " / set lcOutputEvent->alreadyProcessed = false" << endl << flush; jacky_os << endl << "Call toSend->lazyCmp(lcOutputEvent) for a HIT or MISS ... " << endl << flush; #endif if (toSend->lazyCmp(lcOutputEvent)) { //Jacky Note: The basic implementation of lazyCmp() is given in TimeWarpBasicEvent.cc // It only compares 5 things between two BasicEvent: // i.e. (recvTime, sendTime, sender, dest, & sign) //However, for PCD++, we need to compare more infomation about these two BasicEvent! // 1. msgType (I, @, D, X, Y, *) // 2. msgValue if it is (X)/(Y) msg // 3. nextChange in (D) //The concrete implementation of lazyCmp() function must be defined in pmessage.h //for all kinds of PCD++ messages including // 1. TWMessage : BasicEvent // 2. TWInitMessage : TWMessage // 3. TWInternalMessage : TWMessage // 4. TWCollectMessage : TWMessage // 5. TWDoneMessage : TWMessage // 6. TWBasicExternalMessage : TWMessage // 7. TWExternalMessage : TWBasicExternalMessage, BasicMsgValue // 8. TWBasicOutputMessage : TWMessage // 9. TWOutputMessage : TWBasicOutputMessage, BasicMsgValue // Got one! Move event from Lazy Cancel Queue to output Queue. // restore the old eventId. lcOutputEvent->eventId = oldLcOutputEvent; lcOutputEvent->alreadyProcessed = tempFlag; #ifdef DC_DEBUG dcFile << "Lazy Hit : putting back into outputQ : " << endl << *lcOutputEvent << endl; flag++; #endif #ifdef JACKY_DEBUG cerr << "TW[" << this->id << "] Lazy HIT!" << endl; jacky_os << "[LAZY HIT] toSend == lcOutputEvent! / move lcOutputEvent from lazyQ to outputQ ..." << endl << flush; jacky_os << "\tset suppressDecision = true & Hit = true!" << endl << flush; #endif outputQ.insert(lcOutputEvent); lazyCancelQ.removeCurrent(); suppressDecision = true; Hit = true; #ifdef JACKY_DEBUG jacky_os << endl << "--------------Current outputQ [procId:"<< id <<"]---------------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; jacky_os << endl << "--------------Current lazyCancelQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed --------------------------" << endl << flush; #endif } else { // If LAZY cancellation check failed lcOutputEvent->alreadyProcessed = tempFlag; #ifdef DC_DEBUG dcFile << "Lazy Miss " << " : " << *lcOutputEvent << endl; flag++; #endif // restore the old eventId. lcOutputEvent->eventId = oldLcOutputEvent; int destID = lcOutputEvent->dest; negEvent = (BasicEvent *) new char[lcOutputEvent->size]; new(negEvent) BasicEvent(lcOutputEvent); negEvent->sign = NEGATIVE; negEvent->size = sizeof(BasicEvent); #ifdef DC_DEBUG dcFile << "Compared with : " << *toSend << endl; dcFile << "[4]Sending antimessage : " << *lcOutputEvent << endl; flag++; #endif lazyCancelQ.removeCurrent(); #ifdef JACKY_DEBUG jacky_os << "[LAZY MISS] toSend != lcOutputEvent / remove lcOutputEvent from lazyCancelQ ..." << endl << flush; jacky_os << "[LAZY] send [-] event -> " << endl << "\t" << (*negEvent) << endl << flush; //jacky_os << endl << "--------------Current outputQ [procId:"<< id <<"]---------------------" // << endl << flush; //outputQ.printOutputQ(jacky_os); //jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; jacky_os << endl << "--------------Current lazyCancelQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed --------------------------" << endl << flush; #endif sendEventUnconditionally(negEvent); if(commHandle[destID].ptr->isCommManager()){ delete [] ((char *)negEvent); delete [] (char*)lcOutputEvent; } lcOutputEvent = lazyCancelQ.get(); } #ifdef DC_STATS dcFile << Hit << endl; #endif } //end while [loop all previously sent events with the same sendTime as the newly generated event, toSend] #ifdef DC_DEBUG if(flag == 0){ dcFile << "New case : " << *toSend << endl; lazyCancelQ.print(dcFile); } #endif if (lazyCancelQ.size() == 0) { // End of Lazy Cancellation phase. #ifdef JACKY_DEBUG jacky_os << "[LAZY] lazyQ.size = 0, End Lazy Cancel Phase ........................................... " << endl << flush; jacky_os << endl << "[LAZY] current suppressMessage = " << suppressMessage << flush; #endif suppressMessage &= ~LAZYCANCEL; #ifdef LPDEBUG *lpFile << name << ":Lazy Cancellation Queue Exhausted!" << endl; *lpFile << name << ":Suppression Flag = " << suppressMessage << endl; #endif #ifdef JACKY_DEBUG jacky_os << "=> turn off lazy-cancel: suppressMessage = " << suppressMessage << endl << flush; jacky_os << endl << "[LAZY] End of Lazy Cancellation Phase, lazyCancelQ.size = 0!" << endl << flush; #endif } else { // Restore Lazy Cancellation Queue to the first event for next call. lazyCancelQ.seek(0,START); lcOutputEvent = lazyCancelQ.get(); #ifdef JACKY_DEBUG jacky_os << endl << "[LAZY] lazyCancelQ.size = " << lazyCancelQ.size() << " > 0, wait for next call ..." << endl << flush; jacky_os << endl << "--------------Current lazyCancelQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed --------------------------" << endl << flush; jacky_os << endl << "[LAZY] Now, lcOutputEvent = " << flush; if(lcOutputEvent != NULL){ jacky_os << "[" << lcOutputEvent << "] ->{" << (*lcOutputEvent) << "}" << endl << flush; } else { jacky_os << "NULL.............!" << endl << flush; } jacky_os << " Now, currentState->lVT = " << (state->current->lVT).asString() << endl << endl << flush; #endif // Not needed for all applications but you never know when u need this so it gets called always //Jacky Note: //this while loop is going to send [-] msgs for all messages on the lazyQ with the same time stamp! while ((lcOutputEvent != NULL) && (lcOutputEvent->sendTime == state->current->lVT)) { #ifdef JACKY_DEBUG jacky_os << "while .. lcOutputEvent = " << (*lcOutputEvent) << endl << flush; jacky_os << "\t\tlcOutputEvent->sendTime = state->current->lVT = " << (state->current->lVT).asString() << endl << endl << flush; #endif int destID = lcOutputEvent->dest; negEvent = (BasicEvent *) new char[lcOutputEvent->size]; new(negEvent) BasicEvent(lcOutputEvent); negEvent->sign = NEGATIVE; negEvent->size = sizeof(BasicEvent); #ifdef DC_DEBUG dcFile << "[2]Sending antimessage : " << *lcOutputEvent << endl; #endif lazyCancelQ.removeCurrent(); #ifdef JACKY_DEBUG jacky_os << endl << "send [-] event -> " << endl << "\t" << (*negEvent) << endl << flush; //jacky_os << endl << "--------------Current outputQ [procId:"<< id <<"]---------------------" // << endl << flush; //outputQ.printOutputQ(jacky_os); //jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; jacky_os << endl << "--------------Current lazyCancelQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed --------------------------" << endl << flush; #endif sendEventUnconditionally(negEvent); if(commHandle[destID].ptr->isCommManager()){ delete [] ((char *)negEvent); delete [] (char*)lcOutputEvent; } lcOutputEvent = lazyCancelQ.get(); } //end while [sending all other msg as anti-msg] #ifdef JACKY_DEBUG jacky_os << endl << "--------------Current lazyCancelQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed --------------------------" << endl << flush; #endif lazyCancelQ.seek(0,START); } //end if #ifdef DC_DEBUG dcFile << name << ":End Lazy Cancel Phase(suppressDecision = " << suppressDecision << endl; #endif #ifdef JACKY_DEBUG jacky_os << endl << "[LAZY] TW[" << this->id << "] lazyCancel(toSend) END! -> return suppressDecision = " << flush; if(suppressDecision == true){ jacky_os << "TRUE" << endl << flush; } else { jacky_os << "FALSE" << endl << flush; } #endif return (suppressDecision); } #else //not defined JACKY_LAZY_CANCELLATION end //[2006-04-14] ******************** new version ********************* //Jacky Note: //This is the new version for lazy cancellation bool TimeWarp::lazyCancel(BasicEvent* toSend) { bool suppressDecision = false; #ifdef DC_DEBUG int flag = 0; dcFile << name << ":Begin Lazy Cancel Phase" << endl; #endif #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "[LAZY] TW[" << this->id << "] lazyCancel(toSend) called!" << endl << flush; jacky_os << "Begin Lazy Cancel Phase ........................................... " << endl << flush; jacky_os << endl << flush; //print current lazyCancelQ jacky_os << endl << "--------------Current lazyCancelQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed --------------------------" << endl << flush; #endif BasicEvent *lcOutputEvent = NULL, *negEvent = NULL; lcOutputEvent = lazyCancelQ.get(); #ifdef JACKY_DEBUG jacky_os << "current lazyCancelQ size = " << lazyCancelQ.size() << endl << endl << flush; jacky_os << "Get the 1st event on lazyQ, lcOutputEvent = " << endl << flush; if(lcOutputEvent != NULL){ jacky_os << "[" << lcOutputEvent << "] -> " << *lcOutputEvent << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } #endif // Cancel any messages between the last message processed in the outputQ & the current message to send. while ((lcOutputEvent != NULL) && (lcOutputEvent->sendTime < state->current->lVT)) { int destID = lcOutputEvent->dest; negEvent = (BasicEvent *) new char[lcOutputEvent->size]; new(negEvent) BasicEvent(lcOutputEvent); negEvent->sign = NEGATIVE; negEvent->size = sizeof(BasicEvent); #ifdef DC_DEBUG dcFile << "Compared with : " << *toSend << endl; dcFile << "[1]Sending antimessage : " << *lcOutputEvent << endl; flag++; #endif lazyCancelQ.removeCurrent(); #ifdef JACKY_DEBUG jacky_os << "[LAZY] send [-] event -> " << endl << "\t" << (*negEvent) << endl << flush; #endif sendEventUnconditionally(negEvent); if(commHandle[destID].ptr->isCommManager()){ delete [] ((char *)negEvent); delete [] (char*)lcOutputEvent; } lcOutputEvent = lazyCancelQ.get(); } //end while #ifdef JACKY_DEBUG jacky_os << endl << "[LAZY] now CancelQ size = " << lazyCancelQ.size() << endl << flush; jacky_os << "\tlcOutputEvent = " << flush; if(lcOutputEvent != NULL){ jacky_os << "[" << lcOutputEvent << "] -> " << *lcOutputEvent << endl << flush; } else { jacky_os << "NULL!" << endl << flush; } if(toSend != NULL){ jacky_os << "\t[LAZY] toSend = {" << *toSend << "}" << endl << flush; } else { jacky_os << "\t[LAZY] toSend = {NULL}" << endl << flush; cerr << "TW[" << this->id << "] lazyCancel(toSend) -> toSend = NULL!" << endl; abort(); } #endif // Check messages at the current time ONLY for inter-LP messages! //destination of the "toSend" message int toSendDestination = toSend->dest; if( commHandle[toSendDestination].ptr->isCommManager() ) { //1. inter-LP message! //Jacky Note: //loop over inter-LP messages that would have been cancelled if aggressive cancellation is used //only check those events with the SAME sendTime as the regenerated message (toSend) //this is the flag to ensure that we only get one hit per message //we will loop over all inter-LP messages sent out at the current time previously before the rollback //Once we get a HIT, we will set this flag to true and the remaining messages will not be compared any more. //That is, we will get the 1st message that matches our current message "toSend" bool Hit = false; while ((lcOutputEvent != NULL) && (toSend->sendTime == lcOutputEvent->sendTime) && (Hit == false)) { #ifdef DC_DEBUG if (lcOutputEvent == NULL) { dcFile << name << ":lcOutputEvent = NULL" << endl; } else { dcFile << name << ":Comparing messages to send:\n"; dcFile << "comparing " << endl << *lcOutputEvent << " and " << endl << *toSend << endl; } #endif #ifdef JACKY_DEBUG jacky_os << "[LAZY Compare] lcOutputEvent = {" << *lcOutputEvent << "}" << endl << flush; #endif // This is a little wierd so lets be detailed. //We want to only compare the users' stuff and the destination, //so in essence we want to ignore the eventId. SequenceCounter oldLcOutputEvent; oldLcOutputEvent = lcOutputEvent->eventId; lcOutputEvent->eventId = toSend->eventId; bool tempFlag = lcOutputEvent->alreadyProcessed; lcOutputEvent->alreadyProcessed = false; #ifdef JACKY_DEBUG jacky_os << "\toldLcEvent: eventId = " << oldLcOutputEvent << " / alreadyProcessed = " << flush; if( tempFlag == true ){ jacky_os << "TRUE" << flush; } else { jacky_os << "FALSE" << flush; } jacky_os << "\t => lcOutputEvent: set eventId = " << toSend->eventId << " / alreadyProcessed = false" << endl << flush; jacky_os << endl << "Call toSend->lazyCmp(lcOutputEvent) for a HIT or MISS ... " << endl << flush; #endif if (toSend->lazyCmp(lcOutputEvent)) { //A. HIT //Jacky Note: The basic implementation of lazyCmp() is given in TimeWarpBasicEvent.cc // It only compares 5 things between two BasicEvent: // i.e. (recvTime, sendTime, sender, dest, & sign) //However, for PCD++, we need to compare more infomation about these two BasicEvent! // 1. msgType (I, @, D, X, Y, *) // 2. msgValue if it is (X)/(Y) msg // 3. nextChange in (D) //The concrete implementation of lazyCmp() function must be defined in pmessage.h //for all kinds of PCD++ messages including // 1. TWMessage : BasicEvent // 2. TWInitMessage : TWMessage // 3. TWInternalMessage : TWMessage // 4. TWCollectMessage : TWMessage // 5. TWDoneMessage : TWMessage // 6. TWBasicExternalMessage : TWMessage // 7. TWExternalMessage : TWBasicExternalMessage, BasicMsgValue // 8. TWBasicOutputMessage : TWMessage // 9. TWOutputMessage : TWBasicOutputMessage, BasicMsgValue // Got one! Move event from Lazy Cancel Queue to output Queue. //Jacky Note [2006-04-18]: //For a lazy-cancel HIT -> we just move the inter-LP message back to the outputQ //and we will not send out the message again! //setting suppressDecision = true will allow function sendEvent() to delete the //regenerated message "toSend" // restore the old eventId. lcOutputEvent->eventId = oldLcOutputEvent; lcOutputEvent->alreadyProcessed = tempFlag; #ifdef DC_DEBUG dcFile << "Lazy Hit : putting back into outputQ : " << endl << *lcOutputEvent << endl; flag++; #endif #ifdef JACKY_DEBUG cerr << "TW[" << this->id << "] Lazy HIT!" << endl; jacky_os << "[LAZY HIT] move lcOutputEvent from lazyQ to outputQ ..." << endl << flush; jacky_os << "\tset suppressDecision = true & Hit = true!" << endl << flush; #endif outputQ.insert(lcOutputEvent); lazyCancelQ.removeCurrent(); suppressDecision = true; Hit = true; #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::totalNumberOfLazyHit++; #endif //end JACKY_STATISTICS //[2006-04-02] #ifdef JACKY_DEBUG jacky_os << endl << "--------------Current outputQ [procId:"<< id <<"]---------------------" << endl << flush; outputQ.printOutputQ(jacky_os); jacky_os << "----------------- outputQ printed --------------------------" << endl << flush; jacky_os << endl << "--------------Current lazyCancelQ [procId:"<< id <<"]-----------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed ----------------------" << endl << flush; #endif } else { //B. MISS #ifdef DC_DEBUG dcFile << "Lazy Miss " << " : " << *lcOutputEvent << endl; flag++; #endif //Jacky Note: //For a lazy-cancel MISS -> we need to send the previous message (i.e. lcOutputEvent) //as a [-] message and set suppressDecision = false, which allows function sendEvent() //to send out the new event (toSend) as [+] message to its destination. // restore the old values lcOutputEvent->eventId = oldLcOutputEvent; lcOutputEvent->alreadyProcessed = tempFlag; int destID = lcOutputEvent->dest; negEvent = (BasicEvent *) new char[lcOutputEvent->size]; new(negEvent) BasicEvent(lcOutputEvent); negEvent->sign = NEGATIVE; negEvent->size = sizeof(BasicEvent); #ifdef DC_DEBUG dcFile << "Compared with : " << *toSend << endl; dcFile << "[4]Sending antimessage : " << *lcOutputEvent << endl; flag++; #endif #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::totalNumberOfLazyMiss++; #endif //end JACKY_STATISTICS //[2006-04-02] lazyCancelQ.removeCurrent(); #ifdef JACKY_DEBUG jacky_os << "[LAZY MISS] remove lcOutputEvent from lazyCancelQ ..." << endl << flush; jacky_os << "[LAZY] send [-] event -> " << endl << "\t" << (*negEvent) << endl << flush; //jacky_os << endl << "--------------Current outputQ [procId:"<< id <<"]--------------" // << endl << flush; //outputQ.printOutputQ(jacky_os); //jacky_os << "----------------- outputQ printed -------------------" << endl << flush; jacky_os << endl << "------------Current lazyCancelQ [procId:"<< id <<"]--------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed -------------------" << endl << flush; #endif sendEventUnconditionally(negEvent); if(commHandle[destID].ptr->isCommManager()){ delete [] ((char *)negEvent); delete [] (char*)lcOutputEvent; } lcOutputEvent = lazyCancelQ.get(); } #ifdef DC_STATS dcFile << Hit << endl; #endif } //end while [loop all lazyQ events with the same sendTime as that of the regenerated event, toSend] } //end if inter-LP message // else { //2. intra-LP message! //For all intra-LP messages, we don't need to do a HIT/MISS comparison, we simply let //suppressDecision to be false, which allows sendEvent() to send out the new event (toSend) //as [+] message to the LOCAL destination. // } #ifdef DC_DEBUG if(flag == 0){ dcFile << "New case : " << *toSend << endl; lazyCancelQ.print(dcFile); } #endif if (lazyCancelQ.size() == 0) { // End of Lazy Cancellation phase. #ifdef JACKY_DEBUG jacky_os << "[LAZY] lazyQ.size = 0, End Lazy Cancel Phase ........................................... " << endl << flush; jacky_os << endl << "[LAZY OFF] current suppressMessage = " << suppressMessage << flush; #endif suppressMessage &= ~LAZYCANCEL; #ifdef LPDEBUG *lpFile << name << ":Lazy Cancellation Queue Exhausted!" << endl; *lpFile << name << ":Suppression Flag = " << suppressMessage << endl; #endif #ifdef JACKY_DEBUG jacky_os << "=> turn off lazy-cancel: suppressMessage = " << suppressMessage << endl << flush; jacky_os << endl << "[LAZY] End of Lazy Cancellation Phase, lazyCancelQ.size = 0!" << endl << flush; #endif } else { // lazyCancelQ.size() != 0 // Restore Lazy Cancellation Queue to the first event for next call. lazyCancelQ.seek(0,START); lcOutputEvent = lazyCancelQ.get(); #ifdef JACKY_DEBUG jacky_os << endl << "[LAZY] lazyCancelQ.size = " << lazyCancelQ.size() << " > 0, wait for next call ..." << endl << flush; jacky_os << endl << "--------------Current lazyCancelQ [procId:"<< id <<"]---------------------" << endl << flush; lazyCancelQ.printOutputQ(jacky_os); jacky_os << "----------------- lazyCancelQ printed --------------------------" << endl << flush; jacky_os << endl << "[LAZY] Now, lcOutputEvent = " << flush; if(lcOutputEvent != NULL){ jacky_os << "[" << lcOutputEvent << "] ->{" << (*lcOutputEvent) << "}" << endl << flush; } else { jacky_os << "NULL.............!" << endl << flush; } jacky_os << " Now, currentState->lVT = " << (state->current->lVT).asString() << endl << endl << flush; #endif } //end if lazyCancelQ.size() != 0 #ifdef DC_DEBUG dcFile << name << ":End Lazy Cancel Phase(suppressDecision = " << suppressDecision << endl; #endif #ifdef JACKY_DEBUG jacky_os << endl << "[LAZY] TW[" << this->id << "] lazyCancel(toSend) END! -> return suppressDecision = " << flush; if(suppressDecision == true){ jacky_os << "TRUE" << endl << flush; } else { jacky_os << "FALSE" << endl << flush; } #endif return (suppressDecision); } #endif //end JACKY_LAZY_CANCELLATION //[2006-04-14] ********************* new version end *************************** #endif // end of LAZYCANCELLATION or LAZYAGGR_CANCELLATION //**** end of function lazyCancel =================================================================================== // NEW_STATE_MANAGEMENT void TimeWarp::timeWarpInit() { getStateManager()->createInitialState(); #ifdef MATTERNGVTMANAGER tMin = state->current->lVT; #endif initialize(); } void TimeWarp::outputGcollect(VTime) { // This method is written to stop CC from yelling that the other // outputGcollect method hides the virtual method in BasicTimeWarp. cerr << "This method must NEVER be called." << endl; abort(); } void TimeWarp::setTimeWindow(VTime newWindow) { timeWindow = newWindow; } void TimeWarp::setLPHandle(LogicalProcess* lpHandle2) { this->lpHandle = lpHandle2; #ifdef ONE_ANTI_MESSAGE if (alreadySentAntiMessage != NULL) { delete [] alreadySentAntiMessage; } //initialize the array to have a size of "total number of simuObjs" alreadySentAntiMessage = new BasicEvent*[lpHandle->getTotalNumberOfObjects()]; int iterateUpTo = getLPHandle()->getTotalNumberOfObjects(); //set all elements in the array to NULL for(int i = 0; (i < iterateUpTo); i++) { alreadySentAntiMessage[i] = NULL; } #endif } void TimeWarp::rollbackFileQueues(VTime time) { int i; #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "CHECKHERE!!! ### TimeWarp[" << this->id <<"]::rollbackFileQueues(" << time.asString() << ") called! // numInfiles = " << numInFiles << " / numOutFiles = " << numOutFiles << endl << flush; #endif // Rollback input file queues for (i = 0; i < numInFiles; i++) { #ifdef JACKY_DEBUG jacky_os << "\t ### rollbackTo() called on inFileQ[" << i << "]" << endl << flush; #endif inFileQ[i].rollbackTo(time); } // Rollback output file queues for (i = 0; i < numOutFiles; i++) { #ifdef JACKY_DEBUG jacky_os << "\t ### rollbackTo() called on outFileQ[" << i << "] filename = " << outFileQ[i].getOutFileName() << endl << flush; #endif outFileQ[i].rollbackTo(time); } } void TimeWarp::setFile(ofstream *outfile) { lpFile = outfile; inputQ.setFile(outfile); state->setFile(outfile); outputQ.setFile(outfile); } #ifdef STATEDEBUG void TimeWarp::setStateFile(ofstream *outfile) { lpFile2 = outfile; state->setFile(lpFile2); inputQ.setStateFile(lpFile2); } #endif #ifdef DC_DEBUG void TimeWarp::openDCFile(char* dcFilename) { dcFile.open(dcFilename); } #endif void TimeWarp::terminateSimulation(char *msg) { ((FOSSIL_MANAGER *)gVTHandle)->terminateSimulation(msg); } #if defined(LAZYCANCELLATION) || defined(LAZYAGGR_CANCELLATION) //----------------------------------------------- //Jacky: this function return the MIN sendTime among all events on the lazyQ VTime TimeWarp::getLazyQMinTime() { #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "[LAZY] TW[" << this->id << "] getLazyQMinTime() called!" << endl << flush; #endif if(lazyCancelQ.getHead() != NULL){ #ifdef JACKY_DEBUG jacky_os << "\tlazyCancelQ.head != NULL => MIN_time = head.sendTime = " << (lazyCancelQ.getHead()->object->sendTime).asString() << endl << flush; #endif return lazyCancelQ.getHead()->object->sendTime; } else{ #ifdef JACKY_DEBUG jacky_os << "\tlazyCancelQ.head = NULL => MIN_time = INFINITY!" << endl << flush; #endif return PINFINITY; } } int TimeWarp::getLazyQSize(){ return lazyCancelQ.size(); } #endif // end of LAZYCANCELLATION or LAZYAGGR_CANCELLATION ------------------------------------------------------- void TimeWarp::deAllocateState(BasicState* oldState) { delete oldState; } #endif