/******************************************************************* * Revised By: Qi (Jacky) Liu * * EMAIL: mailto://liuqi@sce.carleton.ca * * Revision Date: Sept. 6, 2005 *******************************************************************/ //-*-c++-*- #ifndef LOGICAL_PROCESS_CC #define LOGICAL_PROCESS_CC // Copyright (c) 1994-1996 Ohio Board of Regents and the University of // Cincinnati. All Rights Reserved. // // BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY // FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT // PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, // EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE // PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME // THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. // // IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING // WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR // REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR // DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL // DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM // (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED // INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF // THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH HOLDER // OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. // // // $Id: LogicalProcess.cc,v 1.1.1.1 2007/03/15 15:45:06 rmadhoun Exp $ // //--------------------------------------------------------------------------- #include "LogicalProcess.hh" #include "SimulationTime.hh" #include #include #ifdef STATS #include #endif #include "../../../JackyDebugStream.h" //for jacky-debug-mode & jacky_rb_zero #ifdef JACKY_RB_ZERO //Nov. 7, 2005 #include "../../../parsimu.h" #endif //end JACKY_RB_ZERO //add these header files [2006-01-23] #include //[2006-01-23] sleep() #include //[2006-01-23] time(), ctime() #ifdef JACKY_DEBUG //----------------------------- #include #include #include "../../../strutil.h" #include "../../../message.h" //#include "../../../pmessage.h" #endif // end JACKY_DEBUG ------------------------ #include "../../../pmessage.h" class CommPhyInterface; extern CommPhyInterface *commLib; LogicalProcess::LogicalProcess(int totalNum, int myNum, int lpNum) #ifdef STATS : lpStats(myNum), comm(lpNum, commLib), scheduler(myNum) { #else : comm(lpNum, commLib), scheduler(myNum) { #endif int i; totalObjects = totalNum; numObjects = myNum; numLPs = lpNum; simulationFinished = false; initialized = false; numRegistered = 0; cycleCount = 0; getGVT = false; simArray = new ObjectRecord[totalObjects]; #ifdef STATS simObjArray = new simulationObjectStats[numObjects]; #endif BasicTimeWarp::inputQ.initialize(numObjects); // tell the commManager to initialize itself comm.initCommManager(simArray, &gVT, numLPs, this); id = comm.getID(); ifstream gvtPeriod; gvtPeriod.open("GVTPeriod"); if (!gvtPeriod.good()) { MAX_GVT_COUNT = 15000; gVT.gvtPeriod = MAX_GVT_COUNT; } else { gvtPeriod >> MAX_GVT_COUNT; gvtPeriod >> gVT.noOfGVTToSkip; gVT.gvtPeriod = MAX_GVT_COUNT; gvtPeriod.close(); } if(id == 0){ cout << "Using a period of " << MAX_GVT_COUNT << " to calculate GVT\n"; } #ifdef STATS if(FILE_OUTPUT){ char filename[20]; sprintf(filename,"LPSTATS%d",id); commitStatsFile.open(filename); if( !commitStatsFile.good()){ cerr << "LPSTATS " << id << " couldn't open logging file!" << endl; exit(-1); } } gVT.setStatFile(&commitStatsFile); #endif #if defined(LPDEBUG) || defined(OPENLPDEBUGFILE) char filenames[20]; sprintf(filenames,"LP%d",id); outFile.open(filenames); if(!outFile.good()){ cerr << "LP " << id << " couldn't open logging file!" << endl; exit(-1); } else { gVT.setFile( &outFile ); comm.setFile( &outFile ); #ifdef LPDEBUG scheduler.setFile( &outFile ); #endif } #ifdef MESSAGE_AGGREGATION gFileHandle = &outFile; #endif #endif #ifdef STATEDEBUG char statedebugFilename[20]; sprintf(statedebugFilename,"LPSTATE%d",id); outFile2.open(statedebugFilename); if(!outFile2.good()){ cerr << "LP " << id << " couldn't open logging file!" << endl; exit(-1); } #endif for (i = 0; i < totalObjects; i++) { simArray[i].ptr = NULL; simArray[i].lpNum = MAXINT; } #if !defined(MPI) && !defined(TCPLIB) && !defined(FWNS) if( numLPs > 1 && id == 0 ){ cerr << "Error! numLPs = " << numLPs << " but no message passing support compiled in!\n"; } #endif #ifdef DISPLAY_SIM_CONFIG if(id == 0){ //[2006-03-04] only show info on machine 0 printConfigInfo(); } #endif #if defined(INFREQ_POLLING) infreqInfile.open("infreqPolling.config"); if(!infreqInfile.good()) { cerr << "\n\nCould not open infreqPolling.config file !\n\n"; abort(); } infreqConfigCheck(); #ifdef INFREQ_TRACE char infreqTemp[30]; sprintf(infreqTemp,"./infreqTraceLP%d",id); infreqTraceFile.open(infreqTemp); if(!infreqTraceFile.good()){ cerr << "\n\nCould not open output file to write trace to \n\n" ; abort(); } #endif #endif #ifdef MCS_TRACE char infreqTempStats[30]; sprintf(infreqTempStats,"./MCS_TraceLP%d",id); mcsTraceFile.open(infreqTempStats); if(!mcsTraceFile.good()){ cerr << "\n\nCould not open output file to write MCS stats to \n\n" ; abort(); } #endif } LogicalProcess::~LogicalProcess() { #ifdef STATS if(FILE_OUTPUT){ printSimulationObjectStats(&commitStatsFile); } else { printSimulationObjectStats(&cout); } delete [] simObjArray; if(FILE_OUTPUT){ commitStatsFile.close(); } #endif delete [] simArray; #ifdef LPDEBUG outFile.close(); #endif #if defined(INFREQ_POLLING) infreqInfile.close(); #ifdef INFREQ_TRACE infreqTraceFile.close(); #endif #endif #ifdef MCS_TRACE mcsTraceFile.close(); #endif } #if defined(INFREQ_POLLING) void LogicalProcess::infreqConfigCheck(){ #if defined(INFREQ_FXD_INCREMENT) || defined(INFREQ_VAR_INCREMENT) infreqInfile >> pollMax >> pollDelta ; #elif defined(INFREQ_STATIC) infreqInfile >> pollStatic ; #else // user did not specify anything ; let default be INFREQ_VAR_INCREMENT infreqInfile >> pollMax >> pollDelta ; #endif cout <<"\n\nUsing INFREQUENT POLLING of the message communication subsystem\n"; #ifdef INFREQ_FXD_INCREMENT cout << "DYNAMIC Strategy with FIXED increment of " << pollDelta << "\nMAX polling frequency is " << pollMax << endl << endl ; #elif defined(INFREQ_STATIC) cout << "STATIC Strategy with polling frequency of " << pollStatic << endl << endl ; #else cout << "DYNAMIC Strategy with VARIABLE increment of " << pollDelta << " %" << "\nMAX polling frequency is " << pollMax << endl << endl ; pollDelta *= .01 ; #endif }; #endif #ifdef STATS void LogicalProcess::printSimulationObjectStats(ostream* statsFile){ *statsFile << "**** Time Warp Simulation Object Statistics *****" << endl; *statsFile << "--------------------------------------------------------------------------------" << endl; *statsFile << " Events\n"; *statsFile << "LP# Obj# Undone Sent+ve Sent-ve Recv+ve Recv-ve Strag+ve ProcMsg"; #ifdef ONE_ANTI_MESSAGE *statsFile << " MsgSupp" << endl; #else *statsFile << endl; #endif *statsFile << "--------------------------------------------------------------------------------" << endl; int i; int totalPosStragglers = 0; int totalEventsUndone = 0; int totalPosEventsSent = 0; int totalPosEventsRecv = 0; int totalNegEventsSent = 0; int totalNegEventsRecv = 0; int totalprocessedEvents = 0; for(i=0; i < numObjects; i++){ int numPosStragglers = simObjArray[i].getNumberOfPosStragglers(); int eventsUndone = simObjArray[i].getEventsUndone(); int numPosEventsSent = simObjArray[i].getPosSent(); int numPosEventsRecv = simObjArray[i].getPosRecv(); int numNegEventsSent = simObjArray[i].getNegSent(); int numNegEventsRecv = simObjArray[i].getNegRecv(); int processedEvents = simObjArray[i].getEventsProcessed(); totalPosStragglers += numPosStragglers; totalEventsUndone += eventsUndone; totalPosEventsSent += numPosEventsSent; totalPosEventsRecv += numPosEventsRecv; totalNegEventsSent += numNegEventsSent; totalNegEventsRecv += numNegEventsRecv; totalprocessedEvents += processedEvents; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(2) << id << "] " ; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(3) << lpStats.eventIds[i] << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << eventsUndone << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << numPosEventsSent << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << numNegEventsSent << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << numPosEventsRecv << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << numNegEventsRecv << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << numPosStragglers << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << processedEvents << "] "; #ifdef ONE_ANTI_MESSAGE statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(2) << simObjArray[i].getLocalEventsSupp() << "," << setw(2) << simObjArray[i].getRemoteEventsSupp() << "]" << endl; #else *statsFile << endl; #endif } *statsFile << "--------------------------------------------------------------------------------\n"; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(2) << id << "] " ; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(3) << "###] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << totalEventsUndone << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << totalPosEventsSent << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << totalNegEventsSent << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << totalPosEventsRecv << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << totalNegEventsRecv << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << totalPosStragglers << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(7) << totalprocessedEvents << "] " << endl; *statsFile << "--------------------------------------------------------------------------------\n"; *statsFile << " Avg_RBD Avg_RBD MaxRBD MaxRBD Max Sz Avg Sz StateQ \n"; *statsFile << "LP # Obj# Rollbks (Time) (State) (Time) (State) StateQ StateQ Sampls\n"; *statsFile << "--------------------------------------------------------------------------------\n"; VTime totalAvgRollbackDistanceTime = 0; int totalAvgRollbackDistanceStates = 0; VTime totalMaxRollbackDistanceTime = simObjArray[0].getMaxRollbackDistanceTime(); int totalMaxRollbackDistanceStates = simObjArray[0].getMaxRollbackDistanceStates(); int totalRollbacks = 0; int totalMaxStateQSize = simObjArray[0].getMaximumStateQSize(); int totalAvgStateQSize = 0; int totalStateQSizeCalls = 0; for(i=0; i < numObjects; i++){ VTime avgRollbackDistanceTime = 0; int avgRollbackDistanceStates = 0; VTime maxRollbackDistanceTime = simObjArray[i].getMaxRollbackDistanceTime(); int maxRollbackDistanceStates = simObjArray[i].getMaxRollbackDistanceStates(); int numRollbacks = simObjArray[i].getRollbackCount(); int maxStateQSize = simObjArray[i].getMaximumStateQSize(); int avgStateQSize = simObjArray[i].getAvgStateQSize(); int stateQSizeCalls = simObjArray[i].getNumStateQSizeCalls(); VTime totalRollbackDistanceTime = simObjArray[i].getRollbackDistanceTime(); int totalRollbackDistanceStates = simObjArray[i].getRollbackDistanceStates(); if(totalMaxRollbackDistanceTime < maxRollbackDistanceTime) { totalMaxRollbackDistanceTime = maxRollbackDistanceTime; } if(totalMaxRollbackDistanceStates < maxRollbackDistanceStates){ totalMaxRollbackDistanceStates = maxRollbackDistanceStates; } totalRollbacks += numRollbacks; if (totalMaxStateQSize < maxStateQSize) { totalMaxStateQSize = maxStateQSize; } totalAvgStateQSize += avgStateQSize; totalStateQSizeCalls += stateQSizeCalls; if(numRollbacks != 0){ avgRollbackDistanceTime = (totalRollbackDistanceTime/numRollbacks); avgRollbackDistanceStates = (totalRollbackDistanceStates/numRollbacks); totalAvgRollbackDistanceTime = totalAvgRollbackDistanceTime + avgRollbackDistanceTime; totalAvgRollbackDistanceStates += avgRollbackDistanceStates; } statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(2) << id << "] " ; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(2) << lpStats.eventIds[i] << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << numRollbacks << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << avgRollbackDistanceTime << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << avgRollbackDistanceStates << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << maxRollbackDistanceTime << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << maxRollbackDistanceStates << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << maxStateQSize << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << avgStateQSize << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << stateQSizeCalls << "] "; *statsFile << endl; } *statsFile << "------------------------------------------------------------------------------\n"; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(2) << id << "] " ; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(2) << "##] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << totalRollbacks << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << totalAvgRollbackDistanceTime/numObjects << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << totalAvgRollbackDistanceStates/numObjects << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << totalMaxRollbackDistanceTime << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << totalMaxRollbackDistanceStates << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << totalMaxStateQSize << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << totalAvgStateQSize/numObjects << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << totalStateQSizeCalls << "] "; *statsFile << endl; *statsFile << "------------------------------------------------------------------------------\n"; *statsFile << "------------------------------------------------------------------------------\n"; *statsFile << " Min Sz Avg Sz Max Sz Min Sz Avg Sz Max Sz StateQ \n"; *statsFile << "LP # Obj# StateQ StateQ StateQ StateQ StateQ StateQ Sampls \n"; *statsFile << " State State State Time Time Time \n"; *statsFile << "------------------------------------------------------------------------------\n"; for(i=0; i < numObjects; i++){ int minStateQSize = simObjArray[i].getMinimumStateQSize(); int maxStateQSize = simObjArray[i].getMaximumStateQSize(); int avgStateQSize = simObjArray[i].getAvgStateQSize(); VTime minStateQSizeTime = simObjArray[i].getMinimumStateQTimeDiff(); VTime maxStateQSizeTime = simObjArray[i].getMaximumStateQTimeDiff(); VTime avgStateQSizeTime = simObjArray[i].getAvgStateQTimeDiff(); int stateQSizeCalls = simObjArray[i].getNumStateQSizeCalls(); statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(2) << id << "] " ; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(2) << lpStats.eventIds[i] << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << minStateQSize << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << avgStateQSize << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << maxStateQSize << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << minStateQSizeTime << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << avgStateQSizeTime << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << maxStateQSizeTime << "] "; statsFile->setf(ios::right,ios::adjustfield); *statsFile << "[" << setw(5) << stateQSizeCalls << "] "; *statsFile << endl; } *statsFile << "-----------------------------------------------" << endl; *statsFile << "**** Time Warp Logical Process Statistics *****" << endl; *statsFile << "-----------------------------------------------" << endl; *statsFile << "Total Execution time : " << lpStats.getExecutionTime() << " seconds" << endl; *statsFile << "Total number of processed Messages : " << lpStats.getTotalEventsProcessed() << endl; *statsFile << "Total number of committed Messages : " << lpStats.getTotalEventsCommitted() << endl; *statsFile << "Total number of Messages Undone : " << lpStats.getTotalEventsUndone() << endl; *statsFile << "Period used to calculate GVT : " << MAX_GVT_COUNT << endl; *statsFile << "Number of simulation cycles : " << cycleCount << endl; *statsFile << "-----------------------------------------------" << endl; *statsFile << " Message Aggregation Statistics " << endl; *statsFile << "-----------------------------------------------" << endl; *statsFile << "Total Number of Application Messages Sent : " << lpStats.getNumberOfApplicationMessagesSent() << endl; *statsFile << "Total Number of Physical Messages Sent : " << lpStats.getNumberOfPhysicalMessagesSent() << endl; *statsFile << "Total Number of Application Messages Received : " << lpStats.getNumberOfApplicationMessagesRecv() << endl; *statsFile << "Total Number of Physical Messages Received : " << lpStats.getNumberOfPhysicalMessagesRecv() << endl; #ifdef MESSAGE_AGGREGATION *statsFile << "-----------------------------------------------" << endl; *statsFile << " Printing Aggregate Sizes for LP " << id << endl; *statsFile << "-----------------------------------------------" << endl; for(i = 0; i < MAX_AGGREGATE_SIZE; i++){ if(lpStats.statsArray[i] != 0) { *statsFile << i << " " << lpStats.statsArray[i] << endl; } } *statsFile << "-----------------------------------------------" << endl; *statsFile << " Printing Trigger Frequency for LP " << id << endl; *statsFile << "-----------------------------------------------" << endl; *statsFile << "# of Triggers due to Window Size Exhaustion :" << logicalProcessStats::window_size_exhausted << endl; *statsFile << "# of Triggers due to no messages in input Q :" << logicalProcessStats::waited_long_trigger << endl; *statsFile << "# of Successful Triggers due to no messages in input Q :" << logicalProcessStats::waited_too_long << endl; *statsFile << "# of Triggers due to receiving a control message :" << logicalProcessStats::received_control_message << endl; *statsFile << "# of Triggers due to full aggregation buffer :" << logicalProcessStats::write_buffer_full << endl; *statsFile << "# of Triggers due to rollback :" << logicalProcessStats::got_roll_back << endl; *statsFile << "Average Over-Optimistic Lookahead :" << (logicalProcessStats::overoptimistic_lookahead)/(logicalProcessStats::lookaheadCount) << endl; #endif lpStats.printNumberOfKernelMessages(statsFile); }; #endif #ifdef DISPLAY_SIM_CONFIG void LogicalProcess::printConfigInfo() { #ifdef JACKY_SINGLE_MACHINE cout << "\n\tBUILD FOR RUNNING ON SINGLE MACHINE!" < LTSF Scheduling " < Using external agent " << endl; #endif //------------------ State saving strategies --------------------------------- #ifdef STATEMANAGER cout <<"\t\t--> Saving state information every event " << endl; #endif #ifdef INFREQSTATEMANAGER cout <<"\t\t--> Infrequent state saving : InfreqStateManager" << endl; cout <<"\t\t State Period = " << STATE_PERIOD << endl; #endif #ifdef LINSTATEMANAGER cout <<"\t\t--> Infrequent state saving : LinStateManager" << endl; #endif #ifdef NASHSTATEMANAGER cout <<"\t\t--> Infrequent state saving : NashStateManager" << endl; #endif #ifdef COSTFUNCSTATEMANAGER cout <<"\t\t--> Infrequent state saving : CostFuncStateManager" << endl; #endif #ifdef JACKY_MSG_TYPE_BASED_STATE_SAVING cout <<"\t\t--> Message type-based state saving" << endl; #endif //---------------- GVT estimation strategies ------------------------------- #ifdef GVTMANAGER cout <<"\t\t--> pGVT algorithm for GVT estimation " << endl; #if defined(INFREQSTATEMANAGER) && defined(JACKY_FOSSIL_COLLECTION_STRATEGY_A) cout <<"\t\t \tStrategy A!" << endl; #elif defined(INFREQSTATEMANAGER) && defined(JACKY_FOSSIL_COLLECTION_STRATEGY_B) cout <<"\t\t \tStrategy B!" << endl; #endif //end #endif //end GVTMANAGER #ifdef MATTERNGVTMANAGER cout <<"\t\t--> Mattern's algorithm for GVT estimation " << endl; #endif //--------------- Memory management strategies ----------------------------- #ifdef SEGREGATEDMEMALLOC cout <<"\t\t--> Custom memory manager : SegregatedMemAlloc"< Custom memory manager : GlobalMemoryManager"< Custom memory manager : BuddyMemoryManager"< Custom memory manager : BrentMemAlloc"< System's memory manager "< One anti message optimization "< Message aggregation optimization : "; #ifdef MEAN_OF_FACTORS cout <<"MEAN_OF_FACTORS" << endl ; #elif defined(FIXED_PESSIMISM) cout <<"FIXED_PESSIMISM" << endl ; #elif defined(FIXED_MSG_COUNT) cout <<"FIXED_MSG_COUNT" << endl ; #elif defined(PROBE_SEND_RECEIVE_MESSAGES) cout <<"PROBE_SEND_RECEIVE_MESSAGES" << endl ; #elif defined(ADAPTIVE_AGGREGATION) cout <<"ADAPTIVE_AGGREGATION" << endl ; #else cout <<"ERROR ! NO OPTIMIZATION SPECIFIED. " << endl ; #endif //end aggregation strategies #endif //end message aggregation //---------------------- Infrequent polling ---------------------------------- #ifdef INFREQ_POLLING cout <<"\t\t--> Infrequent Polling optimization : "; #ifdef INFREQ_STATIC cout <<"INFREQ_STATIC" << endl; #elif defined(INFREQ_FXD_INCREMENT) cout <<"INFREQ_FXD_INCREMENT" << endl; #elif defined(INFREQ_VAR_INCREMENT) cout <<"INFREQ_VAR_INCREMENT" << endl; #else cout <<"ERROR ! NO OPTIMIZATION SPECIFIED. " << endl; #endif //end polling strategies #endif //end Infrequent polling //--------------------- Cancellation strategies ------------------------------ #ifdef LAZYCANCELLATION cout <<"\t\t--> Lazy cancellation strategy "< Dynamic cancellation strategy "< Aggressive cancellation strategy "< simualte() called" << endl; jacky_os << "*****************************************************************" << endl << flush; jacky_os << "**** LP" << id << " simUntil = " << simUntil.asString() << " ****" << endl << flush; jacky_os << "*****************************************************************" << endl << flush; #endif register int gvtcount=0; #ifdef JACKY_SHOWGVT // [2006-01-13] //initialize the GVT to 0 at the biginning VTime previousGVT = VTime::Zero; VTime currentGVT = VTime::Zero; //flag to control printing GVT at time 0 bool printGVT = true; #endif // end JACKY_SHOWGVT if(initialized == false) { cerr << "allRegistered not called in LP " << id << endl; } #ifdef LPDEBUG outFile << "LP " << id << " waiting for start signal" << endl; #endif // wait for init messages for every object in the system, except for our own... //Jacky: see the current time before sending StartMsg //cout << "\tLP " << id << ": entering StartMsg s/r @ " << time(NULL) << endl; if(id == 0) { StartMsg foo; //Jacky: check LP0 can block other LPs for 3 seconds //sleep(3); //LP0 sleeps 3 seconds before sending the StartMsg // let's send out start messages - we don't need one ourselves... for(i = 1; i < numLPs; i++) { foo.destLP = i; //Jacky: print the time when LP0 sends out the StartMsg //cout << "LP0 send StartMsg to LP" << i << "@" << time(NULL) << endl; comm.recvMsg(&foo); } } else { comm.waitForStart(); } //cout << "LP " << id << ": starting simulation." << endl; //Jacky: print the time when each LP resumes simulation //cout << "LP " << id << ": starting simulation @ " << time(NULL) << endl; // this is the actual execution of the simulation simulationFinished = false; #if defined(INFREQ_POLLING) int infreqSkips = 0 ; int infreqMsgsGot = 0 ; float infreqCurFreq = 1 ; #ifdef INFREQ_STATIC infreqCurFreq = pollStatic ; #endif #ifdef INFREQ_TRACE infreqTraceFile << infreqCurFreq << endl; #endif #endif #ifdef JACKY_STATISTICS //[2006-04-02] BasicTimeWarp::initializationWatch.stop(); BasicTimeWarp::initializationTime += BasicTimeWarp::initializationWatch.elapsed(); //a flag to switch on/off the dormantWatch //1. if dormantSwitch = false & nextEvent = NULL -> start dormantWatch, set dormantSwitch = true // => we have entered empty loop, start timing now. In the follwing loops, we will not restart // the watch again since the switch is set to true! // //2. if dormantSwitch = true & nextEvent != NULL -> stop dormantWatch, set dormantSwitch = false, // add elapsed time to "dormantTime" // => we have reactivated again, stop timing now. In the following busy loop, we will not stop // the watch again since the switch is set to false bool dormantSwitch = false; #endif //end JACKY_STATISTICS //[2006-04-02] #ifndef JACKY_REVISION while( gVT.getGVT() < SIMUNTIL) { #else while( gVT.getGVT() < simUntil) { #endif // end JACKY_REVISION cycleCount++; #ifdef JACKY_DEBUG /* if(cycleCount == 1){ cout << "LP[" << getLPid() << "] enters while loop!" << endl; sleep(1); } */ jacky_os << endl << flush; jacky_os << "while loop [cycleCount = " << cycleCount << "]" << endl << flush; jacky_os << "\t-----> gVT.getLGVT() = " << gVT.getLGVT() << endl << flush; #endif //end JACKY_DEBUG #if defined(MPI) || defined(TCPLIB) || defined(FWNS) if (numLPs > 1) { #ifdef LPDEBUG int numReceived = comm.recvMPI(1000); outFile << numReceived << " messages received" << endl; #else #if !defined(INFREQ_POLLING) #ifdef MCS_TRACE //comm.recvMPI(lrand48()%1001); mcsTraceFile << comm.recvMPI(1000) << endl; #else //comm.recvMPI(lrand48()%1001); comm.recvMPI(1000); //Jacky: this will be called to receive MPI msgs from other LPs #endif //end MCS_TRACE #else // ifdef INFREQ_POLLING if(infreqSkips == 0) { // we skipped the number of events we wanted to skip // call mpi and get number of messages recvd in infreqMsgsRecvd infreqMsgsGot = comm.recvMPI(1000); #ifdef MCS_TRACE mcsTraceFile << infreqMsgsGot << endl; #endif //end MCS_TRACE #ifndef INFREQ_STATIC if( infreqMsgsGot > 0){ // we got one or more messages if(infreqMsgsGot > 1) { // we got more than one message , we are polling too infrequently // let's decrease polling freq #ifdef INFREQ_FXD_INCREMENT infreqCurFreq -= pollDelta ; #elif defined(INFREQ_VAR_INCREMENT) infreqCurFreq *= (1 - pollDelta) ; #endif //end INFREQ_FXD_INCREMENT if(infreqCurFreq < 1) infreqCurFreq = 1 ; } }else{ // we got zero messages , let's increase poll freq #ifdef INFREQ_FXD_INCREMENT infreqCurFreq += pollDelta ; #elif defined(INFREQ_VAR_INCREMENT) infreqCurFreq *= (1 + pollDelta ) ; #endif //end INFREQ_FXD_INCREMENT if(infreqCurFreq > pollMax) infreqCurFreq = pollMax ; } #endif // INFREQ_STATIC infreqSkips = (int)infreqCurFreq; #ifdef INFREQ_TRACE infreqTraceFile << infreqSkips << endl; #endif // end INFREQ_TRACE } else { infreqSkips-- ; } #endif // INFREQ POLLING #endif // LPDEBUG line #ifdef MESSAGE_AGGREGATION #ifndef ADAPTIVE_AGGREGATION incrementAgeOfAggregatedMessage(); #endif #endif } //end if (numLPs > 1) #endif // if defined(MPI) || defined(TCPLIB) // seek to the first event to execute in the queue //Jacky: this will NOT change "alreadyProcessed" of the event // alreadyProcessed should still be FALSE! BasicEvent *nextEvent = BasicTimeWarp::inputQ.getEventToSchedule(); #ifdef JACKY_DEBUG if(nextEvent != NULL){ //BasicEvent temp(nextEvent); //temp.alreadyProcessed = nextEvent->alreadyProcessed; TWMessage* temp = dynamic_cast(nextEvent); Message* tempmsg = temp->getMessage(); jacky_os <<"Execute event -> id=" << nextEvent->eventId << " sign=" << nextEvent->sign << " " << nextEvent->sender << "@" << nextEvent->sendTime << " =" << tempmsg->type() << "=> " << nextEvent->dest << "@" << nextEvent->recvTime << " [address = " << nextEvent << "]" << flush; #ifdef JACKY_EVENT_JUMP // ~~~~~~~~~~~~~~~~~~~~ Nov. 22, 2005 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if(temp->derivedFromExternalEvent() == true){ jacky_os << " [Event = T]" << flush; } #endif //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if(tempmsg->type() == "D") { //Done message, print out nextChange() and isFromSlave() jacky_os << " / nextChange=" << ((DoneMessage*)tempmsg)->nextChange() << flush; } if(tempmsg->type() == "X") { //External message, print out enderModelId(), value(),and destPort BasicExternalMessage* xMsg = (BasicExternalMessage*)tempmsg; jacky_os << " / (X) msg: senderModelId = " << xMsg->senderModelId() << flush ; string sendingModel = SingleParallelModelAdm::Instance().model(xMsg->senderModelId()).description(); jacky_os << " / sendingModel = " << sendingModel << " / value=" << ((RealMsgValue*)(xMsg->value()))->v << " / destPort = " << (xMsg->port()).name() << "@" << (xMsg->port()).model().description() << flush; } jacky_os<< endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete tempmsg; #endif } else { jacky_os << "event is NULL!\n" << flush; } #endif #ifdef MESSAGE_AGGREGATION #ifdef FIXED_MSG_COUNT if(nextEvent == NULL) { //The inputQ is empty //flushing when the inputQ is empty comm.flushIfWaitedTooLong(); } #endif #if defined (ADAPTIVE_AGGREGATION) || defined(PROBE_SEND_RECEIVE_MESSAGES) if(nextEvent == NULL) { //The inputQ is empty //flushing when the inputQ is empty comm.flush(); } else { comm.checkToSend(BasicTimeWarp::inputQ.getNextUnprocessedEventTime()); } #endif #endif //end MESSAGE_AGGREGATION int executeCycle = 0; // if the event is not already processed, execute this event if(nextEvent != NULL){ //1{ #ifdef JACKY_STATISTICS //[2006-04-02] if( dormantSwitch == true ){ //stop dormantWatch if nextEvent != NULL & dormantSwitch = true BasicTimeWarp::dormantWatch.stop(); #ifdef JACKY_DEBUG jacky_os << endl << "[WATCH] stop dormant watch !!!" << endl << endl << flush; #endif //reset the switch dormantSwitch = false; //we are not dormant now BasicTimeWarp::dormantTime += BasicTimeWarp::dormantWatch.elapsed(); } #endif //end JACKY_STATISTICS //[2006-04-02] #if (defined(MESSAGE_AGGREGATION) && defined(STATS)) lpStats.incrementLookahead(nextEvent->recvTime - gVT.getGVT() ); #endif #ifdef LPDEBUG outFile << *nextEvent << "\n"; #endif if(nextEvent->alreadyProcessed != true){ //nextEvent is not NULL, and it's not yet processed 2{ #ifdef EXTERNALAGENT if(id == 0){ if(gVT.numberOfEvents > MAX_EVENTS_TO_WAIT && gVT.tokenAlreadySent == false){ cout << "numberOfEvents = " << gVT.numberOfEvents << " && tokenAlreadySent = " << gVT.tokenAlreadySent << endl; gVT.startExternalAgent(); gVT.numberOfEvents = 0; } else { gVT.numberOfEvents++; } } else { if(gVT.tokenReceived == true){ gVT.numberOfEvents = 0; gVT.tokenReceived = false; } else{ gVT.numberOfEvents++; gVT.readyToPassItOn(); } } #endif #ifndef JACKY_REVISION if(nextEvent->recvTime <= SIMUNTIL){ #else if(nextEvent->recvTime <= simUntil){ //before the given stopTime //3{ #endif // end JACKY_REVISION //Jacky: now a non-NULL unprocessed event with time <= stopTime is found //set the flag ranBetweenTokens in GVTManager gVT.setRanBetweenTokens(true) ; /*Jacky Note: scheduler.runProcesses() will invoke TimeWarp::executeSimulation() on the receiving simObj (destination ParallelProcessor) ** 1> the TimeWarp::simulate() is called ** a> set the current state's inputPos = inputQ.getCurrent() ** b> ParallelProcessor::executeProcess() is called ** //Note: if a rollback is needed, it should have happened before now ** -> 1> call TimeWarp::getEvent() ** -> a> call LTSFInputQueue::getAndSeek() ** -> this will set alreadyProcessed = TRUE ** b> if the event != NULL ** -> set current state's lVT = toExecute->recvTime ** 2> call rollbackCheck(msg->time()) ** 3> call receive() function ** 4> delete the received msg ** 2> the TimeWarp::saveState() is called ** a> set current state's outputPos = outputQ.getTail() //ptr to Container ** b> set current state's myLastOutput = true ** c> call StateManager::saveState() ** -> 1> allocate a new state (tempState) //getProcessPointer()->allocateState() ** 2> call tempState->copyState(current) to make a copy of the current state ** 3> insert the tempState into the stateQ -> simply add the state to the END ** of the stateQ */ scheduler.runProcesses(); //nextEvent = BasicTimeWarp::inputQ.seek(1,CURRENT); } //3} } else { //the event is already processed! //search for the next unprocessed event nextEvent = BasicTimeWarp::inputQ.seek(1,CURRENT); } //2} #ifdef JACKY_DEBUG #ifdef JACKY_PRINT_INPUTQ //output the inputQ into the inputQXX.output file inputQstream << "@ cycleCount = " << cycleCount << "------------------->" << endl << flush; //1. get the head of the inputQ on this LP SortedListOfEvents* inputQueue = dynamic_cast* >(scheduler.queuePtr); BasicEvent* eventCursor = (BasicEvent*)(inputQueue->getHead()); //2. print out all BasicEvents in the queue while(eventCursor != NULL){ TWMessage* tempTWmsg = dynamic_cast(eventCursor); Message* tempCDmsg = tempTWmsg->getMessage(); inputQstream << "[" << eventCursor << "] <" << " id=" << eventCursor->eventId << " sign=" << eventCursor->sign << " " << eventCursor->sender << "@" << eventCursor->sendTime << " =" << tempCDmsg->type() << "=> " << eventCursor->dest << "@" << eventCursor->recvTime << " alreadyProcessed?" << eventCursor->alreadyProcessed << " >"; if(eventCursor == nextEvent){ inputQstream << " <-- Executed!" << endl << flush; } else { inputQstream << endl << flush; } #ifdef JACKY_DESTROY_MESSAGE delete tempCDmsg; #endif eventCursor = eventCursor->next; } //end while if(inputQueue->getHead() != NULL){ //head != NULL eventCursor = (BasicEvent*)(inputQueue->getHead()); inputQstream << "\t head --> id = " << eventCursor->eventId << " / sender = " << eventCursor->sender << endl << flush; } else { inputQstream << "\t head = NULL!" << endl << flush; } if(inputQueue->getCurrent() != NULL){ //currentPos != NULL eventCursor = (BasicEvent*)(inputQueue->getCurrent()); inputQstream << "\t currentPos --> id = " << eventCursor->eventId << " / sender = " << eventCursor->sender << endl << flush; } else { inputQstream << "\t currentPos = NULL!" << endl << flush; } if(inputQueue->getInsertPos() != NULL){ //insertPos != NULL eventCursor = (BasicEvent*)(inputQueue->getInsertPos()); inputQstream << "\t insertPos --> id = " << eventCursor->eventId << " / sender = " << eventCursor->sender << endl << flush; } else { inputQstream << "\t insertPos = NULL!" << endl << flush; } if(inputQueue->getFindPos() != NULL){ //currentPos != NULL eventCursor = (BasicEvent*)(inputQueue->getFindPos()); inputQstream << "\t findPos --> id = " << eventCursor->eventId << " / sender = " << eventCursor->sender << endl << flush; } else { inputQstream << "\t findPos = NULL!" << endl << flush; } if(inputQueue->getTail() != NULL){ //tail != NULL eventCursor = (BasicEvent*)(inputQueue->getTail()); inputQstream << "\t tail --> id = " << eventCursor->eventId << " / sender = " << eventCursor->sender << endl << flush; } else { inputQstream << "\t tail = NULL!" << endl << flush; } inputQstream << endl << flush; #endif #endif //end JACKY_DEBUG (inputQ printed when nextEvent != NULL) } //end if nextEvent != NULL 1} #ifdef JACKY_DEBUG #ifdef JACKY_PRINT_INPUTQ if(nextEvent == NULL){ inputQstream << "@ cycleCount = " << cycleCount << "-----------> No more event to execute!" << endl << flush; } #endif #endif #ifdef JACKY_STATISTICS //[2006-04-02] if( (nextEvent == NULL) && (dormantSwitch == false) ){ //start dormantWatch if nextEvent = NULL & dormantSwitch = false BasicTimeWarp::dormantWatch.start(); #ifdef JACKY_DEBUG jacky_os << endl << "[WATCH] start dormant watch !!!" << endl << endl << flush; #endif //reset the switch dormantSwitch = true; //we are dormant now } #endif //end JACKY_STATISTICS //[2006-04-02] //now, code for NULL or non-NULL events executeCycle++; nextEvent = BasicTimeWarp::inputQ.get(); if(gvtcount < MAX_GVT_COUNT ) { gvtcount++; } else { gVT.calculateLGVT(); //calculate LGVT every MAX_GVT_COUNT iterations gvtcount = 0; getGVT = false; #ifndef JACKY_SHOWGVT // [2006-01-13] ------------------------------- cout << "GVT: " << gVT.getGVT() << endl; //show the current GVT #else //-------------------------------------------------------------- //show GVT only when it changes currentGVT = gVT.getGVT(); if( currentGVT == VTime::Zero && printGVT // && getLPid() == 0 // only print GVT by LP0 ){ cout << "GVT [" << getLPid() << "]: " << currentGVT.asString() << endl; // only print GVT by LP0 //cout << "GVT : " << currentGVT.asString() << endl; printGVT = false; // only print GVT at 0 once } else if ( previousGVT != currentGVT // && getLPid() == 0 // only print GVT by LP0 ) { cout << "GVT [" << getLPid() << "]: " << currentGVT.asString() << endl; //cout << "GVT : " << currentGVT.asString() << endl; // only print GVT by LP0 previousGVT = currentGVT; } #endif // end JACKY_SHOWGVT #ifdef JACKY_DEBUG jacky_os << "calculate LGVT = " << gVT.getLGVT() << " GVT = " << gVT.getGVT() << "\n"; #endif } } // while ( gVT.getGVT() < simUntil && simulationFinished == false ) #ifdef MESSAGE_AGGREGATION //Let me flush out all my send buffers, when I have reached PINFINITY comm.flush(); #endif // this might need to move, if more than one call to simulate is needed // Garbage collect the output queues first and then // garbage collect the rest of the stuff in each process. // This is needed to avoid memory errors in the system, when the // output queue container is pointing to an event in the input queue // of a process with a lower id (on the same lp). for(i = 0; (i < totalObjects); i++) { simArray[i].ptr->finalGarbageCollectOutputQueue(); } for(i = 0; (i < totalObjects); i++) { simArray[i].ptr->finalize(); simArray[i].ptr->finalGarbageCollect(); } #ifdef JACKY_DEBUG jacky_os << "LP" << getLPid() << "simArray gc finished!\n"; #endif #ifdef STATS // stop the clock !! lpStats.lpStopWatch.stop(); #endif if(id == 0) { cout << "Simulation complete!" << endl; #ifdef JACKY_DEBUG jacky_os << "Simulation complete!\n"; #ifdef JACKY_PRINT_INPUTQ inputQstream << flush; inputQstream.close(); #endif #endif } #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] cout << "-------------------------- Statistics ---------------------------" << endl; cout << "LP[" << id << "] (ER) = " << scheduler.getPositiveMsgNumber() << " / (EI) = " << scheduler.getNegativeMsgNumber() << " / (PR) = " << scheduler.getPositiveStragglerMsgNumber() << " / (SR) = " << scheduler.getNegativeStragglerMsgNumber() << " / (RB) = " << scheduler.getRollbackNumber() << " / (RBL) = " << scheduler.getUnprocessedEventNumber() << endl << flush; #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] #ifdef JACKY_STATISTICS //[2006-04-02] cout << "LP[" << id << "] localObjs = " << numObjects << " / (SS) = " << BasicTimeWarp::totalNumberOfStatesSaved << " / (SK) = " << BasicTimeWarp::totalNumberOfStatesSkipped << " / (SR) = " << BasicTimeWarp::totalNumberOfStatesOmitted << " / (EE) = " << BasicTimeWarp::totalNumberOfEventsExecuted << " / (CFL) = " << BasicTimeWarp::totalNumberOfEventsCoastedForward << endl << flush; cout << "LP[" << id << "] (LH) = " << BasicTimeWarp::totalNumberOfLazyHit << " / (LM) = " << BasicTimeWarp::totalNumberOfLazyMiss << endl << flush; cout << "LP[" << id << "] (ST) = " << BasicTimeWarp::totalTimeForStateSaving << " ns " << " / (ET) = " << BasicTimeWarp::totalTimeForEventExecution << " ns " << " / (CT) = " << BasicTimeWarp::totalTimeForCoastForward << " ns" << " / (RT) = " << BasicTimeWarp::totalTimeForRollback << " ns" << endl << flush; cout << "LP[" << id << "] (BT) = " << BasicTimeWarp::initializationTime << " ns" << endl << flush; cout << "LP[" << id << "] (DT) = " << BasicTimeWarp::dormantTime << " ns" << endl << flush; if(id == 0){ cout << "(Inter-LP Message Size) = " << TWExternalMessage::getTWExternalMessageSize() << " bytes" << endl << flush; } #endif //end JACKY_STATISTICS //[2006-04-02] } void LogicalProcess::registerObject(BasicTimeWarp* handle) { numRegistered++; if (handle->id >= totalObjects) { ostrstream errmsg; char* errstr; errmsg << "Object id " << handle->id << " trying to register, but " << totalObjects-1 << " is the largest id allowed!" << ends; errstr = errmsg.str(); TerminateMsg * ouch = new TerminateMsg; strcpy(ouch->error, errstr); comm.recvMsg(ouch); } if(simArray[handle->id].ptr == NULL){ //insert process into the process scheduler handle->setSchedulerHandle(&scheduler); scheduler.initialize(&(handle->inputQ), simArray, id); handle->localId = numRegistered - 1; // now let's let the Communication guy know that we're registering this dude simArray[handle->id].ptr = handle; simArray[handle->id].lpNum = id; #ifdef STATS lpStats.setEventIds(handle->id); // store event ids for later use #endif } else { cerr << "Object " << handle->id << " trying to register twice!" << endl; TerminateMsg * ouch = new TerminateMsg; strcpy(ouch->error, "Object tried to register twice" ); comm.recvMsg(ouch); } handle->setCommHandle(simArray); handle->gVTHandle = &gVT; // this needed by MESSAGE_AGGREGATION, ONE_ANTI_MESSAGE, and for stats collection handle->setLPHandle(this); #ifdef STATS handle->setStatsFile(&commitStatsFile); #endif #if defined(LPDEBUG) || defined(OPENLPDEBUGFILE) handle->setFile(&outFile); #endif #ifdef STATEDEBUG handle->setStateFile(&outFile2); #endif #ifdef OBJECTDEBUG handle->openFile(); #endif #if defined (LAZYCANCELLATION) || defined(LAZYAGGR_CANCELLATION) #ifdef STATS char dcStatsFilename[80]; strcpy(dcStatsFilename,handle->name); strcat(dcStatsFilename,".dcstats"); handle->openDCStatsFile(dcStatsFilename); #endif #endif } void LogicalProcess::allRegistered( void ) { int i,j; if( numRegistered == numObjects ){ gVT.initGVTManager( simArray, &comm, id, totalObjects, numObjects, numLPs); if(id == 0){ // at this point, if we are object 0, we should wait for init messages // from everyone... comm.waitForInit(totalObjects - numObjects); // now we've got all of the objects... we need to send out all of the // objects to every one else InitMsg initMsg; for(i = 0; i < totalObjects; i++) { initMsg.objId = i; initMsg.lpId = simArray[i].lpNum; for(j = 1; j < numLPs; j++) { initMsg.destLP = j; comm.recvMsg(&initMsg); } // for j } // for i } // if id == 0 else { // we should walk through our local array sending the objects to the // main guy... for(i = 0 ; i < totalObjects; i++ ){ if(simArray[i].lpNum == id) { comm.iHaveObject(simArray[i].ptr->id); } // if } // for // OK, now we need to receive all of the data FROM the head guy... comm.waitForInit( totalObjects ); } //cout << "LP " << id << ": initializing simulation objects" << endl << flush; #ifndef JACKY_RB_ZERO //Nov. 7, 2005 original version =============================================== // call the initialization routine and save initial state for everyone for(i = 0; i < totalObjects; i++) { if (simArray[i].lpNum == id) { // NEW_STATE_MANAGEMENT // With the new state management in place LogicalProcess calls the // protected virtual method in BasicTimeWarp which is overloaded // in TimeWarp that creates a new state and inturn calls initialize() simArray[i].ptr->timeWarpInit(); simArray[i].ptr->saveState(); #if defined(INFREQSTATEMANAGER) && defined(JACKY_INFREQ_STATEMANAGER) //[2006-03-13] //Jacky Note: after saving the initial state, the "timeAtLastCall" & "periodCounter" //in the InfreqStateManager has been changed! //timeAtLastCall = 0 (rather than -1), periodCounter = statePeriod (rather than 0) // //However, we need the processors save the 1st state after processing the 1st event //at time 0 when skipStateSaving = false. i.e. NC & FC save state after processing //the 1st (D) message, and Simulators saves state after processing the 1st (*) message // //So we need to rerset the periodCounter & timeAtLastCall back to their initial value #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "[InfreqStateManager] TW[" << simArray[i].ptr->id << "]After saving initial state, reset timeAtLastCall & periodCounter!" << endl << endl << flush; #endif simArray[i].ptr->state->resetTimeAtLastCallAndPeriodCounter(); #endif //end INFREQSTATEMANAGER) && JACKY_INFREQ_STATEMANAGER //[2006-03-13] } } #else // new version ================================================================================= //Jacky Note: Since the afterInitialize() function in ParallelMainSimulator is called AFTER // this function, the states of the models (CoupledCell & all Cells) are initialized // AFTER the initial states are saved onto the stateQ. This causes problem when we // try to rollback to "before zero". When we restore the current state to the initial // state on the stateQ, it doesn't contain the information of the models! //We need to call afterInitialize() 1> AFTER allocating the current states & 2> BEFORE saving //the current states onto the stateQ // call the initialization routine and save initial state for everyone for(i = 0; i < totalObjects; i++) { if (simArray[i].lpNum == id) { //this will allocate the current states & call initialize() on processors simArray[i].ptr->timeWarpInit(); } } //now, let's call afterInitialize() defined in ParallelMainSimulator ParallelMainSimulator::Instance().afterInitialize(); for(i = 0; i < totalObjects; i++) { if (simArray[i].lpNum == id) { //this will save a copy of the current state on the stateQ simArray[i].ptr->saveState(); } } #endif //end JACKY_RB_ZERO =========================================================================== // This has to be done later than the previous loop because of OFC. for(i = 0; i < totalObjects; i++) { if (simArray[i].lpNum == id) { // set preconditions for first state simArray[i].ptr->clearInitState(); } } initialized = true; BasicTimeWarp::inputQ.initialized = true; } else { cerr << "LP " << id << " incorrect number of objects registered!" << endl; cerr << "Expected " << numObjects << " objects, and " << numRegistered << " registered!" << endl; TerminateMsg * ouch = new TerminateMsg; strcpy(ouch->error, "invalid number of objects registered" ); comm.recvMsg(ouch); } } int LogicalProcess::getNumObjects() { return numObjects; } int LogicalProcess::getTotalNumberOfObjects() const { return totalObjects; } void LogicalProcess::calculateGVT() { getGVT = true; } #ifdef JACKY_SYNC_TIME0 //[2006-01-23] ------------------------------------------------------------ //define a public function synchronizeInitialization() to implement a synchronization Barrier void LogicalProcess::synchronizeInitialization(){ //call barrierSynchronize() defined in the CommManager [CommMgrInterface.hh/cc] comm.barrierSynchronize(); } #endif // end JACKY_SYNC_TIME0 -------------------------------------------------------------------- #endif