/******************************************************************* * * DESCRIPTION: class ParallelProcessor * * AUTHOR: Alejandro Troccoli * * EMAIL: mailto://atroccol@dc.uba.ar * * DATE: 07/11/2000 * *******************************************************************/ /** include files **/ #include "strutil.h" #include "pprocess.h" // header #include "pmessage.h" #include "model.h" // class Model #include "parsimu.h" #include "msgbag.h" #include "log.h" // class Log #include "JackyDebugStream.h" //jacky-debug-mode #ifdef JACKY_REVISION #include "psimulat.h" //class ParallelSimulator for parentFCId #endif #include "pncoord.h" //class ParallelNodeCoordinator #include "pfcoord.h" //class ParallelFlatCoordiantor #include "BasicEvent.hh" //BasicEvent /** public data **/ const ProcId ParallelProcessor::InvalidId( -1 ) ; /** public functions **/ /******************************************************************* * Function Name: Constructor ********************************************************************/ ParallelProcessor::ParallelProcessor( Model *m ) : lastMsgTime( VTime::Zero ) ,mdl( m ) , ident( InvalidId ) { MASSERT( m ); } /******************************************************************* * Function Name: Destructor ********************************************************************/ ParallelProcessor::~ParallelProcessor() { #ifndef KERNEL_TIMEWARP //Close all open files... for( int i = 0; i < numOutFiles; i++) delete outFileQ[i]; delete outFileQ; #endif } /******************************************************************* * Function Name: id ********************************************************************/ ParallelProcessor &ParallelProcessor::id( const ProcId &id ) { //DEVS processor Id ident = id ; //Warped Object Id SimulationObj::id = id; return *this ; } /******************************************************************* * Function Name: receive * Description: just throw exception ********************************************************************/ ParallelProcessor & ParallelProcessor::receive( const InitMessage & ) { InvalidMessageException e ; e.addText( "The abstract base class Processor cannot receive InitMessage!" ) ; MTHROW( e ) ; return *this ; } /******************************************************************* * Function Name: receive * Description: just throw exception ********************************************************************/ ParallelProcessor & ParallelProcessor::receive( const CollectMessage & ) { InvalidMessageException e ; e.addText( "The abstract base class Processor cannot receive CollectMessage!" ) ; MTHROW( e ) ; } /******************************************************************* * Function Name: receive * Description: just throw exception ********************************************************************/ ParallelProcessor & ParallelProcessor::receive( const InternalMessage & ) { InvalidMessageException e ; e.addText( "The abstract base class Processor cannot receive InternalMessage!" ) ; MTHROW( e ) ; } /******************************************************************* * Function Name: receive * Description: just throw exception ********************************************************************/ ParallelProcessor & ParallelProcessor::receive( const OutputSyncMessage & ) { InvalidMessageException e ; e.addText( "The abstract base class Processor cannot receive OutputSyncMessage!" ) ; MTHROW( e ) ; } /******************************************************************* * Function Name: receive * Description: just throw exception ********************************************************************/ ParallelProcessor & ParallelProcessor::receive( const BasicOutputMessage * ) { InvalidMessageException e ; e.addText( "The abstract base class Processor cannot receive OutputMessage!" ) ; MTHROW( e ) ; } /******************************************************************* * Function Name: receive * Description: just throw exception ********************************************************************/ ParallelProcessor & ParallelProcessor::receive( const BasicExternalMessage *msg) { // cerr << "[" << ParallelMainSimulator::Instance().getMachineID() // << "] processor.receive (BasicExternalMessage) in " << model().asString() // << "msg.time = " << msg->time().asString() << endl ; externalMsgs.add( msg ); //#ifdef JACKY_X_SKIP_SAVING // Nov. 23, 2005 ############################################################### #ifdef JACKY_MSG_TYPE_BASED_STATE_SAVING //[2006-06-19] //the same effect as the JACKY_X_SKIP_SAVING, but this is to compare the performance improvement //before and after using JACKY_MSG_TYPE_BASED_STATE_SAVING skipStateSaving = true; //this causes the TimeWarp to skip state saving after processing this event #ifdef JACKY_DEBUG //------------------------------------------------------------------ ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "##### Processor::skipStateSaving = TRUE #####" << endl << flush; #endif //end JACKY_DEBUG -------------------------------------------------------------- #endif //##################################################################################################### return *this; } /******************************************************************* * Function Name: receive * Description: just throw exception ********************************************************************/ ParallelProcessor & ParallelProcessor::receive( const DoneMessage &m ) { InvalidMessageException e ; //cerr << "ProcessID: " << id() << endl ; e.addText( "The abstract base class Processor cannot receive DoneMessage!" ) ; MTHROW( e ) ; } //Jacky: this func is modified by Glinsky. When the model calls outputFunction() //it will in turn call the ParallelProcessor's sendOutput(). If the corresponding //model is an Atomic model (i.e. this processor must be a ParallelSimulator) this //func send Y msg to the parentFCId (Flat Coordinator); otherwise, it sends Y msg //to parentId (in fact, this otherwise case will not happen sicne only Atomic model //will be implemented by moduler, and the sendOutput() func will always be called //by a ParallelSimulator) /******************************************************************* * Method: sendOutput ********************************************************************/ ParallelProcessor & ParallelProcessor::sendOutput( const VTime &time, const Port &port, const Value &value ) { if (model().isAtomic()) { // cerr << "[" << ParallelMainSimulator::Instance().getMachineID() // << "] processor.sendOutput (OutputMessage) to model.parentFCId=" << this->parentFCId << endl ; #ifndef JACKY_REVISION send( OutputMessage( time, id(), port, value ), this->parentFCId ) ; #else //get the parentFCId (Jacky, Oct. 21, 2005) ParallelSimulator* psimulator = dynamic_cast(this); if(psimulator != NULL) { send( OutputMessage( time, id(), port, value ), psimulator->parentFCId ) ; } else { cerr << "ParallelProcessor::sendOutput() -> psimulator is NULL!" << endl; exit(-1); } #endif } else { cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] Not using FCId for outputMessage (!)" << endl ; cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] processor.sendOutput (OutputMessage) to model.parentID()=" << model().parentId() << endl ; send( OutputMessage( time, id(), port, value ), model().parentId() ) ; } return *this; } /******************************************************************* * Method: sendOutput ********************************************************************/ ParallelProcessor & ParallelProcessor::sendOutput( const VTime & time, const Port & port , BasicMsgValue *value) { send( BasicOutputMessage( time, id(), port, value ), model().parentId() ) ; return *this; } /******************************************************************* * Function Name: description ********************************************************************/ const string ParallelProcessor::description() const { return mdl->description(); } /******************************************************************* * Function Name: rollbackCheck ********************************************************************/ bool ParallelProcessor::rollbackCheck( const VTime& currentTime ) { // Jacky: This function has no effect on restoring the content of the MessageBag since rollbacks // happen before this function is called. // We have to use some mechanism defined in the TimeWarp kernel to handle the clearance of // message bags when rollbacks happen // Oct. 26, 2005 /* bool rollback; if ( currentTime < lastMsgTime ) { //externalMsgs.eraseAll(); //commented by Jacky, Oct. 26, 2005 rollback = true; } else { rollback = false; } lastMsgTime = currentTime; return rollback; */ cerr << "ParallelProcessor::rollbackCheck() called! This should never happen!" << endl; return false; } //Functions that are required by Warped /******************************************************************* * Function Name: initialize ********************************************************************/ void ParallelProcessor::initialize() { ParallelMainSimulator::Instance().debugStream() << "Initializing Object " << description() << "(" << id() << "): " << flush; #ifdef KERNEL_TIMEWARP //============================================================================================= //For the TimeWarp Kernel, use the File API provided by TimeWarp //[2006-04-20] //Jacky Note: if -LY is given on the command line, we only log (Y) messages, but all the log files are still // created and opened at the beginning of the simulatuion, which results in a longer initialization // time. // -LY => Log::Default.logType() = 8 //If createlog = Log::Default.logType() = 8, we only log (Y) message for the FC! This is enough for the drawlog! //All other processors (Root, NC, and Simulators) should not create log files in this case, thus only one log //file will be created on each node! //JACKY_SINGLE_LOG_FILE_LY //[2006-04-26] //If -LY is not given, and we need to create log files, we only create a single log file for the NC on the //node. That is, we will create totally N log files, where N is the number of nodes participating in the //simulation! #ifndef JACKY_UNIQUE_LOG //[2006-04-26] ----------------------------------------------------------------------- #ifndef JACKY_SINGLE_LOG_FILE_LY //[2006-04-20] ************************************************* //Jacky: neither JACKY_UNIQUE_LOG nor JACKY_SINGLE_LOG_FILE_LY is defined! // A) Original Version if (Log::Default.createLog() ) { createlog = Log::Default.logType(); //Jacky Note: createlog = 1, 2, 4, 8, 16, 32, 64 //In the previous version, we create log file for every processor! numOutFiles = 1; logIndex = 0; if ( !Log::Default.logToStdOut() ) outFileQ = new FileQueue[1]( string(Log::Default.filename() + id()).c_str()); else outFileQ = new FileQueue[1]( "/dev/stdout" ); } else { //createLog() = flase, i.e. type = logNone createlog = Log::logNone; //Jacky Note: createlog = 0 } #else //[2006-04-20] ************** if JACKY_SINGLE_LOG_FILE_LY is defined ********************** //Jacky: JACKY_SINGLE_LOG_FILE_LY is defined, but JACKY_UNIQUE_LOG is not defined // B) Original Version enhanced by -LY option if (Log::Default.createLog() ) { createlog = Log::Default.logType(); //Jacky Note: createlog = 1, 2, 4, 8, 16, 32, 64 if(createlog != 8) { //if it is not -LY, create log file for every processor! numOutFiles = 1; logIndex = 0; if ( !Log::Default.logToStdOut() ) outFileQ = new FileQueue[1]( string(Log::Default.filename() + id()).c_str()); else outFileQ = new FileQueue[1]( "/dev/stdout" ); } else { //if it is -LY, only create log file for the FC ParallelFlatCoordinator* fcptr = dynamic_cast(this); if(fcptr != NULL){ //create log file for the FC numOutFiles = 1; logIndex = 0; if ( !Log::Default.logToStdOut() ) outFileQ = new FileQueue[1]( string(Log::Default.filename() + id()).c_str()); else outFileQ = new FileQueue[1]( "/dev/stdout" ); } else { //for all other processors, we don't need to create log files numOutFiles = 0; outFileQ = NULL; } } } else { //createLog() = flase, i.e. type = logNone createlog = Log::logNone; //Jacky Note: createlog = 0 } #endif //end JACKY_SINGLE_LOG_FILE_LY //[2006-04-20] ******************************************** #else //[2006-04-26] ------------------------- new "One Log per Node" version --------------------------------- //Jacky: JACKY_UNIQUE_LOG is defined // C) New Version // Note: this new version includes the -LY enhancement! if (Log::Default.createLog() ) { createlog = Log::Default.logType(); //Jacky Note: createlog = 1, 2, 4, 8, 16, 32, 64 if(createlog != 8) { //if it is not -LY //In this version, we only create log file for the NC //This log file is shared by all processors on that node, and all log data will be write to it ParallelNodeCoordinator* ncptr = dynamic_cast(this); if(ncptr != NULL){ //create log file for the NC numOutFiles = 1; logIndex = 0; if ( !Log::Default.logToStdOut() ) outFileQ = new FileQueue[1]( string(Log::Default.filename() + id()).c_str()); else outFileQ = new FileQueue[1]( "/dev/stdout" ); } else { //for all other processors, we don't need to create log files numOutFiles = 0; outFileQ = NULL; } } else { //if it is -LY, only create log file for the FC ParallelFlatCoordinator* fcptr = dynamic_cast(this); if(fcptr != NULL){ //create log file for the FC numOutFiles = 1; logIndex = 0; if ( !Log::Default.logToStdOut() ) outFileQ = new FileQueue[1]( string(Log::Default.filename() + id()).c_str()); else outFileQ = new FileQueue[1]( "/dev/stdout" ); } else { //for all other processors, we don't need to create log files numOutFiles = 0; outFileQ = NULL; } } } else { //createLog() = flase, i.e. type = logNone createlog = Log::logNone; //Jacky Note: createlog = 0 } #endif //end JACKY_UNIQUE_LOG [2006-04-26] -------------------------------------------------------------------- #else //============================================================================================================== //For the NoTime kernel, use ofstreams if ( Log::Default.createLog() ) { numOutFiles = 1; createlog = Log::Default.logType();; logIndex = 0; if ( !Log::Default.logToStdOut() ) { outFileQ = new ostream*[1]; outFileQ[0] = new fstream( string(Log::Default.filename() + id()).c_str(), ios::out ); } else { outFileQ = new ostream*[1]; outFileQ[0] = &cout; } } else { createlog = Log::logNone; } #endif //============================================================================================================= } /******************************************************************* * Function Name: executeProcess ********************************************************************/ void ParallelProcessor::executeProcess() { #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); #endif //Receive an event and execute the corresponding function. TWMessage* receivedMsg; BasicEvent* event = getEvent(); //#ifdef JACKY_DEBUG //let's see the address of the event that we are going to execute //jacky_os << "2006-02-03 PProcess => execute event address = " << event << endl << endl << flush; //#endif receivedMsg = (TWMessage*)event; //receivedMsg = (TWMessage*) getEvent(); if (receivedMsg != NULL) { switch (receivedMsg->getMessageType()) { case TWMessage::TWInitMsg: { InitMessage *msg = ((TWInitMessage *)receivedMsg)->getMessage(); if (createlog & Log::logInit) { string log = msg->asStringReceived() + " / " + model().description() + "(" + id() + ")" + "\n"; writelog ( msg->time(), log ); } //rollbackCheck( msg->time()); receive( *msg ); //showTheNextEvent(); //Nov. 3, 2005 delete msg; break; } case TWMessage::TWInternalMsg: { InternalMessage *msg = ((TWInternalMessage *)receivedMsg)->getMessage(); if (createlog & Log::logInternal) { string log = msg->asStringReceived() + " / " + model().description() + "(" + id() + ")" + "\n"; writelog ( msg->time(), log ); } //rollbackCheck( msg->time()); receive( *msg ); //showTheNextEvent(); //Nov. 3, 2005 delete msg; break; } case TWMessage::TWOutputSyncMsg: { OutputSyncMessage *msg = ((TWOutputSyncMessage *)receivedMsg)->getMessage(); if (createlog & Log::logOutput) { string log = msg->asStringReceived() + " / " + model().description() + "(" + id() + ")" + "\n"; writelog ( msg->time(), log ); } //rollbackCheck( msg->time()); receive( *msg ); //showTheNextEvent(); //Nov. 3, 2005 delete msg; break; } case TWMessage::TWCollectMsg: { CollectMessage *msg = ((TWCollectMessage *)receivedMsg)->getMessage(); if (createlog & Log::logCollect ) { string log = msg->asStringReceived() + " / " + model().description() + "(" + id() + ")" + "\n"; writelog ( msg->time(), log ); } //rollbackCheck( msg->time()); receive( *msg ); //showTheNextEvent(); //Nov. 3, 2005 delete msg; break; } case TWMessage::TWDoneMsg: { DoneMessage *msg = ((TWDoneMessage *)receivedMsg)->getMessage(); if (createlog & Log::logDone ) { string log = msg->asStringReceived() + " / " + model().description() + "(" + id() + ")" + "\n"; writelog ( msg->time(), log ); } //rollbackCheck( msg->time()); receive( *msg ); //showTheNextEvent(); //Nov. 3, 2005 delete msg; break; } case TWMessage::TWExternalMsg: { BasicExternalMessage *msg = ((TWExternalMessage *)receivedMsg)->getMessage(); /*Jacky: This received (X) msg can have different sources for different receivers: ** 1> for NC -> received (X) msg can only come from remote NCs on other machines ** 2> for FC -> received (X) msg must come from the local NC ** 3> for simulator -> received (X) msg must come from the local FC ** ** in case 2> & 3> the sending processors are local to the receiver, we can check the ProcessorDB and ModelDB ** to get the information needed for writing log files ** ** in case 1> the sending processor (NC) is remote, and it does NOT exist in the ProcessorDB! */ //check whether this is a NC ParallelNodeCoordinator* ncptr = dynamic_cast(this); #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------------- //check whether this is a FC ParallelFlatCoordinator* fcptr = dynamic_cast(this); #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] ------------------------------------------- #ifdef JACKY_DEBUG #if !defined(JACKY_INFREQ_STATEMANAGER) && !defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------------- //check whether this is a FC ParallelFlatCoordinator* fcptr = dynamic_cast(this); #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] ------------------------------------------- if(ncptr != NULL){ //NC jacky_os << "!!!!!!!!! (X) msg received in NC " << ncptr->id() << endl << flush; } if(fcptr != NULL){ //FC jacky_os << "!!!!!!!!! (X) msg received in FC " << fcptr->id() << endl << flush; } #endif if (createlog & Log::logExternal ) { string log = msg->asStringReceived() + " / " + model().description() + "(" + id() + ")" + "\n"; #ifdef JACKY_DEBUG if(fcptr != NULL){ jacky_os << "^^^^^^^^ FC log string created: " << log << endl << flush ; } #endif writelog ( msg->time(), log ); #ifdef JACKY_DEBUG if(fcptr != NULL){ jacky_os << "^^^^^^^^ FC writelog() returned! " << endl << flush ; } #endif } //end if createlog & Log::logExternal //rollbackCheck( msg->time()); #ifdef JACKY_NCBAG_POINTERS //[2006-02-03]---------------------------------------------------------------------------- //[2006-02-03] //before calling the receiving function on the NC, we need to save the address //of the BasicEvent into the NCBagReference of the NC's state if(ncptr != NULL){ //NC #ifdef JACKY_DEBUG jacky_os << "2006-02-03: Processor save <" << (event->recvTime).asString() << ", " << event << "> to NCBagRef in NC " << ncptr->id() << endl << flush; #endif (ncptr->getBagRef()).insert( make_pair(event->recvTime, event) ); } #endif //end JACKY_NCBAG_POINTERS [2006-02-03] ----------------------------------------------------------------------- #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] --------------------------------- //[2006-03-15] //before calling the receiving function on the FC, we need to save the address //of the BasicEvent into the FCBagReference of the FC's state //Note: this is done only when Infrequent State Saving strategy is used! if(fcptr != NULL){ //FC #ifdef JACKY_DEBUG jacky_os << "2006-03-15: Processor save <" << (event->recvTime).asString() << ", " << event << "> to FCBagRef in FC " << fcptr->id() << endl << flush; #endif (fcptr->getFCBagRef()).insert( make_pair(event->recvTime, event) ); } #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] ----------------------------------------- receive( msg ); //showTheNextEvent(); //Nov. 3, 2005 break; } case TWMessage::TWOutputMsg: { BasicOutputMessage *msg = ((TWOutputMessage *)receivedMsg)->getMessage(); #if !defined(JACKY_UNIQUE_LOG) && !defined(JACKY_SINGLE_LOG_FILE_LY) //[2006-04-26] --------------------------- //Jacky: neither JACKY_UNIQUE_LOG nor JACKY_SINGLE_LOG_FILE_LY is defined! // A) Original Version if (createlog & Log::logOutput ) { string log = msg->asStringReceived() + " / " + model().description() + "(" + id() + ")" + "\n"; writelog ( msg->time(), log ); } #else //[2006-04-26] ------------------------------------------------------------------------------------------ //Jacky: either JACKY_SINGLE_LOG_FILE_LY or JACKY_UNIQUE_LOG is defined // B) Original Version enhanced by -LY option // or C) New "One log per node" Version if (createlog & Log::logOutput ) { //if createlog = 8, we only writelog for the FC if(createlog == 8){ //it is -LY ParallelFlatCoordinator* fcptr = dynamic_cast(this); if(fcptr != NULL){ string log = msg->asStringReceived() + " / " + model().description() + "(" + id() + ")" + "\n"; writelog ( msg->time(), log ); } } else { //if it is not only -LY string log = msg->asStringReceived() + " / " + model().description() + "(" + id() + ")" + "\n"; writelog ( msg->time(), log ); } } #endif //end [2006-04-26] ------------------------------------------------------------------------------------- //rollbackCheck( msg->time()); receive( msg ); //showTheNextEvent(); //Nov. 3, 2005 break; } } } else { //cout << "NULL MESSAGE!!" << endl; cout << "ParallelProcess::executeProcess() -> receivedMsg = NULL!!" << endl; #ifdef JACKY_DEBUG jacky_os << endl << "ParallelProcess::executeProcess() -> receivedMsg = NULL!!" << endl << flush; #endif MASSERTMSG( false, "ParallelProcess::executeProcess() -> receivedMsg = NULL!!" ); } } /******************************************************************* * Function Name: allocateState ********************************************************************/ BasicState* ParallelProcessor::allocateState() { return new ParallelProcessorState(); } /******************************************************************* * Function Name: sendTWMessage ********************************************************************/ void ParallelProcessor::sendTWMessage(TWMessage* twmsg, const Message& msg, const ProcId &dest) { twmsg->dest = dest; if (createlog & Log::logSent ) { string log = msg.asStringSent(dest) + "\n"; writelog( msg.time(),log ); } /* #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "CCCCCCCCCCCCCCCCCC processor::sendTWMessage(), twmsg->TWExternalEventFlag = " << twmsg->TWExternalEventFlag << " / msg.isEvent = " << msg.isEvent() << endl << flush; #endif */ sendEvent( twmsg ); } /******************************************************************* * Function Name: send ********************************************************************/ ParallelProcessor & ParallelProcessor::send( const InitMessage &msg, const ProcId &dest ) { TWInitMessage* newMsg = new TWInitMessage(msg); sendTWMessage( newMsg, msg, dest ); return *this; } ParallelProcessor & ParallelProcessor::send( const InternalMessage &msg, const ProcId& dest ) { TWInternalMessage* newMsg = new TWInternalMessage(msg); sendTWMessage( newMsg, msg, dest ); return *this; } ParallelProcessor & ParallelProcessor::send( const OutputSyncMessage &msg, const ProcId &dest) { TWOutputSyncMessage* newMsg = new TWOutputSyncMessage(msg); sendTWMessage( newMsg, msg, dest ); return *this; } ParallelProcessor & ParallelProcessor::send( const CollectMessage &msg, const ProcId& dest ) { TWCollectMessage* newMsg = new TWCollectMessage(msg); sendTWMessage( newMsg, msg, dest ); return *this; } ParallelProcessor & ParallelProcessor::send( const BasicOutputMessage &msg, const ProcId &dest ) { int msgSize = sizeof(TWBasicOutputMessage) + msg.value()->valueSize(); TWOutputMessage* newMsg = (TWOutputMessage*) new char[msgSize]; new (newMsg) TWOutputMessage(msg); sendTWMessage( newMsg, msg, dest ); return *this; } ParallelProcessor & ParallelProcessor::send( const BasicExternalMessage &msg, const ProcId &dest) { #ifndef JACKY_REVISION // The send is "remote" iif ( (this machine != the machine for the model) && ( dest != NC.id for this machine) ) // i.e., I'm sending something to a model located in a different machine, and the destination is not the local NC if ( ( ParallelMainSimulator::Instance().getMachineID() != msg.port().model().machineForProcId(msg.port().model().masterId()) ) && ( dest != ParallelMainSimulator::Instance().nodeCoordinatorsList[ParallelMainSimulator::Instance().getMachineID()] ) ) { cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] REMOTE BasicExternalMessage sent to ProcId " << dest << " time=" << msg.time().asString() << endl ; } else { cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] LOCAL BasicExternalMessage sent to ProcId " << dest << " time=" << msg.time().asString() << endl ; } #else // Jacky's revision to show the debugging message //This is a REMOTE send if the destination procId does not exist in the local ParallelProcessorAdmin::ProcessorDB /* if( SingleParallelProcessorAdmin::Instance().findProcId(dest) ) { //LOCAL //cerr << "[" << ParallelMainSimulator::Instance().getMachineID() // << "] LOCAL (X) msg sent to ProcId " << dest // << " time=" << msg.time().asString() << endl ; } else { //REMOTE //cerr << "[" << ParallelMainSimulator::Instance().getMachineID() // << "] REMOTE (X) msg sent to ProcId " << dest // << " time=" << msg.time().asString() << endl ; } */ #endif int msgSize = sizeof(TWBasicExternalMessage) + msg.value()->valueSize(); TWExternalMessage* newMsg = (TWExternalMessage*) new char[msgSize]; new (newMsg) TWExternalMessage(msg); /* #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << "CCCCCCC processor::send(BasicExternalMessage, ProcId) : msg.isEvent = " << msg.isEvent() << " / newMsg->TWExternalEventFlag = " << newMsg->TWExternalEventFlag << endl << flush; #endif */ sendTWMessage( newMsg, msg, dest ); return *this; } ParallelProcessor & ParallelProcessor::send( const DoneMessage &msg, const ProcId &dest) { TWDoneMessage* newMsg = new TWDoneMessage(msg); sendTWMessage( newMsg, msg, dest ); return *this; } //Jacky Note [2006-04-26] //we need to provide a new version of writelog() //In the previous version, we directly write log data to the outFileQ[logIndex] //Here is the options: // 1. if JACKY_UNIQUE_LOG is not defined // -> original version OR enhanced version, we don't need to change this function. // In case of -LY, only FC will call this function to log (Y) messages, which is controlled // in function executeProcess() // 2. if JACKY_UNIQUE_LOG is defined // -> if -LY, i.e. createlog = 8 => we can use the original version since only FC will call this function // and the (Y) messages are logged to the FC's own log file // -> if createlog != 8 => we must find the NC's log file, and write log into that file! /******************************************************************* * Function Name: writelog ********************************************************************/ #ifndef JACKY_UNIQUE_LOG //[2006-04-26] ================================================================================= void ParallelProcessor::writelog(const VTime& time, const string& line) { //Precondition: A log file has been created #ifdef KERNEL_TIMEWARP #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- //[2006-03-15] Don't write to log files when we are in CoastForward Phase! if( getSuppressMessage() != 2 ){ //Not in CF phase [2006-03-15] #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] -------------------------------------- FileData *data = new FileData; data->time = time; data->length = line.length() + 1; data->line = new char[line.length()+1]; line.copy( data->line, line.length()); data->line[line.length()] = 0; outFileQ[logIndex].insert(data); #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- } //end if getSuppressMessage() != 2 #ifdef JACKY_DEBUG else { //if getSuppressMessage() == 2 //we are in CoastForward Operation, don't write to fileQ ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "[Note: CoastForward phase -> No writing to outFileQ[" << logIndex << "]!] " << endl << flush; } #endif //end JACKY_DEBUG #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] -------------------------------------- #else //NoTime Kernel //If the NoTime kernel is used *(outFileQ[logIndex]) << line << flush; #endif //end KERNEL_TIMEWARP } #else //for JACKY_UNIQUE_LOG [2006-04-26] =============================================================================== //2. if JACKY_UNIQUE_LOG is defined // -> if -LY, i.e. createlog = 8 => we can use the original version since only FC will call this function // and the (Y) messages are logged to the FC's own log file // -> if createlog != 8 => we must find the NC's log file, and write log into that file! void ParallelProcessor::writelog(const VTime& time, const string& line) { //Precondition: A log file has been created #ifdef KERNEL_TIMEWARP #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- //[2006-03-15] Don't write to log files when we are in CoastForward Phase! if( getSuppressMessage() != 2 ){ //Not in CF phase [2006-03-15] //Jacky Note [2006-04-30]: //Here, we are using the unique log file strategy! Only a single log file is created for the NC //on each node. For all other processors except the NC, they may do coast forward. In that case, //we should not log data for them. #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] -------------------------------------- FileData *data = new FileData; data->time = time; data->length = line.length() + 1; data->line = new char[line.length()+1]; line.copy( data->line, line.length()); data->line[line.length()] = 0; if(createlog != 8) { //if createlog != 8, we need to find the outFileQ from the NC //and insert the log data to it (ParallelMainSimulator::Instance().localNC)->outFileQ[logIndex].insert(data); } else { //createlog = 8 //if createlog = 8, only FC will call this function to log (Y) messages in its own log file outFileQ[logIndex].insert(data); } #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- } //end if getSuppressMessage() != 2 #ifdef JACKY_DEBUG else { //if getSuppressMessage() == 2 //we are in CoastForward Operation, don't write to fileQ ostream& jacky_os = JackyDebugStream::Instance().Stream(); jacky_os << endl << "[Note: CoastForward phase -> No writing to outFileQ!] " << endl << flush; } #endif //end JACKY_DEBUG #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] -------------------------------------- #else //NoTime Kernel //If the NoTime kernel is used *(outFileQ[logIndex]) << line << flush; #endif //end KERNEL_TIMEWARP } #endif //end JACKY_UNIQUE_LOG [2006-04-26] ============================================================================== #ifdef JACKY_DEBUG //***************************************************************** //Nov. 3, 2005 //This function is used to show the next event that will be executed in the next cycle void ParallelProcessor::showTheNextEvent(){ ostream& jacky_os = JackyDebugStream::Instance().Stream(); TWMessage* nextToExecuteMsg; BasicEvent* nextToExecuteEvent = BasicTimeWarp::inputQ.get(); while( (nextToExecuteEvent != NULL) && (nextToExecuteEvent->alreadyProcessed == true) ){ nextToExecuteEvent = nextToExecuteEvent->next; } if( nextToExecuteEvent != NULL ){ nextToExecuteMsg = (TWMessage*)nextToExecuteEvent; Message* nextCDMsg = nextToExecuteMsg->getMessage(); jacky_os << "CHECK!!! nextEvent -> id=" << nextToExecuteEvent->eventId << " sign=" << nextToExecuteEvent->sign << " " << nextToExecuteEvent->sender << "@" << nextToExecuteEvent->sendTime << " =" << nextCDMsg->type() << "=> " << nextToExecuteEvent->dest << "@" << nextToExecuteEvent->recvTime << " Processed?" << nextToExecuteEvent->alreadyProcessed << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete nextCDMsg; #endif } else { jacky_os << "CHECK!!! nextEvent -> NULL!!!" << endl << flush; } } #endif //*****************************************************************************