/******************************************************************* * * DESCRIPTION: class ParallelRoot * * AUTHOR: Alejandro Troccoli * Revised By: Qi (Jacky) Liu * * EMAIL: mailto://atroccol@dc.uba.ar * mailto://liuqi@sce.carleton.ca * * DATE: 11/01/2001 * Revision Date: Sept. 6, 2005 *******************************************************************/ /** include files **/ #include "proot.h" // base #include "pRootState.h" #include "evaldeb.h" // ShowVirtualTimeWhenFinish #include "message.h" // class Message #include "pmodeladm.h" // class SingleParallelModelAdm #include "coupled.h" // class Coupled #include "parsimu.h" // class ParallelMainSimulator #include "log.h" // class Log #include "JackyDebugStream.h" //jacky-debug-mode & jacky-revision-mode /** private **/ #ifndef JACKY_REVISION //comment out this constructor ========================================== /******************************************************************* * Function Name: constructor Default ********************************************************************/ ParallelRoot::ParallelRoot( RootModel* rootmdl) : ParallelProcessor( rootmdl ) , topMasterId(ParallelProcessor::InvalidId) { //Get the master processor for the top model and set it up as the child //processor. Coupled &coupled( static_cast(SingleParallelModelAdm::Instance().model( "top" ) )) ; topMasterId = coupled.masterId(); } #else //Jacky's revision, new constructor & functions ========================================== /******************************************************************* * Function Name: Default Constructor * Description: this is Jacky's version of the default constructor ********************************************************************/ ParallelRoot::ParallelRoot( RootModel* rootmdl) : ParallelProcessor( rootmdl ) { } /******************************************************************* * Function Name: findTopOutputPorts() * Description: function to find out to which ports on the TOP model we * need to deliver a (Y) msg. i.e. output infomation to the * environment ********************************************************************/ ParallelRoot& ParallelRoot::findTopOutputPorts(const Port& msgPort){ const InfluenceList &influList( msgPort.influences() ); InfluenceList::const_iterator cursor( influList.begin() ); for(; cursor != influList.end(); cursor++) { if( !((*cursor)->model()).isAtomic()) { //destination port belongs to a coupled model if( ((*cursor)->model()).description() == "top" ){ //the receiving port belongs to the TOP model getTopOutputPorts().push_back((*cursor)); } else { // the port belongs to some other intermedial coupled model findTopOutputPorts(*(*cursor)); } } } return *this; } #ifdef JACKY_DEBUG //------------------------------------------------------------------ /******************************************************************* * Function Name: showTopOutputPorts() * Description: print out the output ports on the TOP model found * for debugging purpose ********************************************************************/ void ParallelRoot::showTopOutputPorts(ostream& out){ InfluenceList::const_iterator cursor( getTopOutputPorts().begin() ) ; out << endl << "[OUTPUT] ^^^^^ ParallelRoot: send output to these ports on TOP ^^^^^" << endl << flush; for( ; cursor != getTopOutputPorts().end(); cursor++ ) { out << (*cursor)->name() << "@" << ((*cursor)->model()).asString() << endl << flush; } } #endif //end JACKY_DEBUG ---------------------------------------------------------------- #endif //end JACKY_REVISION ==================================================================== /* public */ /******************************************************************* * Function Name: Instance * Description: get a single instance of the ParallelRoot ********************************************************************/ ParallelRoot &ParallelRoot::Instance() { return static_cast( SingleParallelProcessorAdmin::Instance().processor( ParallelProcessorAdmin::rootId) ) ; } /******************************************************************* * Function Name: initialize() * Description: required by the TimeWarp ********************************************************************/ void ParallelRoot::initialize() { //Create file queues numOutFiles = 0; #ifndef JACKY_UNIQUE_LOG //[2006-04-26] ----------- previous version ------------------------------------------ //In the previous version, we create log file for the Root if -l is given AND -LY is NOT given //if -LY is given, Root does not create log file (createlog = Log::logNone)! if (Log::Default.createLog() #ifdef JACKY_SINGLE_LOG_FILE_LY //[2006-04-20] && (Log::Default.logType() != 8) //if it is not -LY #endif //end JACKY_SINGLE_LOG_FILE_LY [2006-04-20] ) { numOutFiles++; createlog = Log::Default.logType(); logIndex = numOutFiles - 1; //logfile index = 0 } else { createlog = Log::logNone; //cout << "Jacky: Do Not create Log for Root!" << endl << flush; //if(Log::Default.logType() == 8){ // cout << "\tJacky: log type = 8, Do not Log for Root!" << endl << flush; //} } #else //[2006-04-26] -------------- new "One Log per Node" version ------------------- // In this new version, we do not create log file for the Root at all! // If Log::Default.createLog() = true -> // 1. if -LY is given, we only create log for the FCs to log (Y) msgs // 2. if -LY is not given, we only create log for the NCs to log all data // for all local processors including the Root! //i.e. A> if -LY is not given -> no log file for Root, but still log data for Root in the NC's log file! // B> if -LY is given -> no log file for Root & don't log data for Root if ( Log::Default.createLog() && (Log::Default.logType() != 8) ) { //A> no -LY //set createlog to the correct value based on command line options //so that Root's log data will be logged in ParallelProcessor into the NC's log file! createlog = Log::Default.logType(); //However, we leave numOutFiles = 0, i.e. we don't create log file for the Root //but we still need to set the logIndex = 0 logIndex = 0; //for logging into the NC's log file queue } else { //B> -LY or no log at all //set createlog = 0 so that no data will be logged for the Root createlog = Log::logNone; //numOutFiles is 0 as well, i.e. we don't create log file for the Root } //Now, 1. numOutFiles = 0 // 2. createlog = logType() [if createLog() and -LY is not given]; otherwise, createlog = 0 #endif //end JACKY_UNIQUE_LOG [2006-04-26] ------------------------------------------- //Root still needs to handle the output file! if (ParallelMainSimulator::Instance().outputName() != "/dev/null" ) { numOutFiles++; createOutput = true; outputIndex = numOutFiles - 1; } else { //cout << "Jacky: Do Not create Output for Root!" << endl << flush; createOutput = false; } //cout << "Root: numOutFiles = " << numOutFiles << " / logIndex = " << logIndex // << " / outputIndex = " << outputIndex << endl << flush; #ifdef KERNEL_TIMEWARP //============================================================================================ #ifdef JACKY_UNIQUE_LOG if ( Log::Default.createLog() && (Log::Default.logType() == 8) ) { //-l & -LY cout << "\t-LY is on, create log files for FlatCoordinators only." << endl; } else if ( Log::Default.createLog() ) { //-l cout << "\t-LY is off, create log files for NodeCoordinators only." << endl; } else { cout << "\tDo not create log files!" << endl; } #else #ifdef JACKY_SINGLE_LOG_FILE_LY if ( Log::Default.createLog() && (Log::Default.logType() == 8) ) { //-l & -LY cout << "\t-LY is on, create log files for FlatCoordinators only." << endl; } else if ( Log::Default.createLog() ) { //-l cout << "\tCreate log files." << endl; } else { cout << "\tDo not create log files!" << endl; } #else if ( Log::Default.createLog() ) { //-l cout << "\tCreate log files." << endl; } else { cout << "\tDo not create log files!" << endl; } #endif #endif if ( createOutput ) { cout << "Create output file for Root." << endl; } #ifndef JACKY_UNIQUE_LOG //[2006-04-26] ----------------------------------------------------------------------- #ifndef JACKY_SINGLE_LOG_FILE_LY //[2006-04-20] ************************************************* //Jacky: neither JACKY_UNIQUE_LOG nor JACKY_SINGLE_LOG_FILE_LY is defined! // A) Original Version if ( Log::Default.createLog() && createOutput ) { outFileQ = new FileQueue[2]; if (Log::Default.logToStdOut() ) { outFileQ[logIndex].open( "/dev/stdout"); cout << "Log to Standard Output" << endl << flush; } else { outFileQ[logIndex].open(string(Log::Default.filename() + id()).c_str()); cout << "Log to file " << string(Log::Default.filename() + id()) << " " << endl << flush; } outFileQ[outputIndex].open(ParallelMainSimulator::Instance().outputName().c_str()); } else if ( Log::Default.createLog() ) { outFileQ = new FileQueue[1]; if ( Log::Default.logToStdOut() ) { outFileQ[0].open("/dev/stdout"); cout << "Log to Standard Output" << endl << flush; } else { outFileQ[0].open( string(Log::Default.filename() + id()).c_str()); } } else if ( createOutput ) { outFileQ = new FileQueue[1]; //Jacky Note: the following line is modified!!! //outFileQ = new FileQueue[0]( ParallelMainSimulator::Instance().outputName().c_str()); outFileQ[0].open( ParallelMainSimulator::Instance().outputName().c_str() ); } #else //[2006-04-20] ************** if JACKY_SINGLE_LOG_FILE_LY is defined ********************** //Jacky: JACKY_SINGLE_LOG_FILE_LY is defined, but JACKY_UNIQUE_LOG is not defined // B) Original Version enhanced by -LY option //Note: in case of -LY, we have set createlog = Log::logNone if ( (createlog != Log::logNone) && (createOutput == true) ) { //create log & output (and it is not -LY) outFileQ = new FileQueue[2]; if (Log::Default.logToStdOut() ) { outFileQ[logIndex].open( "/dev/stdout"); cout << "Log to Standard Output" << endl << flush; } else { outFileQ[logIndex].open(string(Log::Default.filename() + id()).c_str()); cout << "Log to file " << string(Log::Default.filename() + id()) << " " << endl << flush; } //cout << "Jacky: output to [" << ParallelMainSimulator::Instance().outputName() << "]" << endl << flush; outFileQ[outputIndex].open(ParallelMainSimulator::Instance().outputName().c_str()); } else if ( createlog != Log::logNone ) { //create log only (and it is not -LY) outFileQ = new FileQueue[1]; if ( Log::Default.logToStdOut() ) { outFileQ[0].open("/dev/stdout"); cout << "Log to Standard Output" << endl << flush; } else { outFileQ[0].open( string(Log::Default.filename() + id()).c_str()); } } else if ( createOutput ) { //create output only outFileQ = new FileQueue[1]; outFileQ[0].open( ParallelMainSimulator::Instance().outputName().c_str() ); //cout << "Jacky: output to [" << ParallelMainSimulator::Instance().outputName() << "]" << endl << flush; } #endif //end JACKY_SINGLE_LOG_FILE_LY //[2006-04-20] ******************************************** #else //[2006-04-26] ------------------------- new "One Log per Node" version --------------------------------- //Jacky: JACKY_UNIQUE_LOG is defined // C) New Version // Note: this new version includes the -LY enhancement! //Note: we will NOT create log file for the Root in any case // And, if -LY is given, we will not log data for Root as well if ( createOutput ) { //only create output file for Root outFileQ = new FileQueue[1]; outFileQ[0].open( ParallelMainSimulator::Instance().outputName().c_str() ); //cout << "Jacky: output to [" << ParallelMainSimulator::Instance().outputName() << "]" << endl << flush; } #endif //end JACKY_UNIQUE_LOG [2006-04-26] -------------------------------------------------------------------- /* //If the TimeWarp kernel is being used if ( Log::Default.createLog() && createOutput) { outFileQ = new FileQueue[2]; if (Log::Default.logToStdOut() ) outFileQ[logIndex].open(string(Log::Default.filename() + id()).c_str()); else outFileQ[logIndex].open( "/dev/stdout"); outFileQ[outputIndex].open(ParallelMainSimulator::Instance().outputName().c_str()); } else if ( Log::Default.createLog() ) { if (Log::Default.logToStdOut() ) outFileQ[0].open(string(Log::Default.filename() + id()).c_str()); else outFileQ[0].open("dev/stdout"); } else if ( createOutput ) { outFileQ = new FileQueue[0]( ParallelMainSimulator::Instance().outputName().c_str()); } */ #else //If the NoTime kernel is being used =========================================================================== if ( Log::Default.createLog() && createOutput) { outFileQ = new ostream*[2]; if (Log::Default.logToStdOut() ) outFileQ[logIndex] = &cout; else outFileQ[logIndex] = new fstream(string(Log::Default.filename() + id()).c_str(), ios::out); outFileQ[outputIndex] = new fstream (ParallelMainSimulator::Instance().outputName().c_str(), ios::out); } else if ( Log::Default.createLog() ) { outFileQ = new ostream*[1]; if (Log::Default.logToStdOut() ) outFileQ[0] = &cout; else outFileQ[0] = new fstream(string(Log::Default.filename() + id()).c_str(), ios::out); } else if ( createOutput ) { outFileQ = new ostream*[1]; outFileQ[0] = new fstream( ParallelMainSimulator::Instance().outputName().c_str(), ios::out); } #endif //end KERNEL_TIMEWARP ======================================================================================= rootInitialize(); ParallelMainSimulator::Instance().debugStream() << "OK" << endl << flush; } /******************************************************************* * Function Name: rootInitialize * Description: clean event List and output device ********************************************************************/ ParallelRoot &ParallelRoot::rootInitialize() { lastChange( VTime::Zero ); nextChange( VTime::Inf ); #ifndef JACKY_REVISION //the following 3 lines are removed ======================= sendMsgType( ParallelRootState::CollectMsg ); externalEvents.erase( externalEvents.begin(), externalEvents.end() ) ; stopTime( VTime::Inf ) ; #endif //end JACKY_REVISION ====================================================== #ifdef JACKY_REVISION getTopOutputPorts().clear(); //clear the topOutputPorts list #endif return *this; } /******************************************************************* * Function Name: rootSimulate * Description: Jacky's revision -> the ParallelRoot begins the simulation * by simply sending (I) msgs to all NCs ********************************************************************/ ParallelRoot &ParallelRoot::rootSimulate() { #ifndef JACKY_REVISION //comment out the following =================================== //Jacky: The original version for sorting external events in the ParallelRoot, and // sending (I) msg to the ParallelMCoordinator for the TOP model externalEvents.sort(); eventsBegin(); lastChange( VTime::Zero ); InitMessage initMsg( VTime::Zero, id() ) ; this->send( initMsg, topMasterId ) ; #endif //end JACKY_REVISION ============================================================ //Jacky: the external events are no longer loaded into the ParallelRoot! // sort() is called for each NC in the ParallelMainSimulator! // this function simply sending (I) msgs to all NCs to start the simulation! // the ParallelMCoordinator for the TOP model will Not get InitMessage anymore. InitMessage initMsgNC( VTime::Zero, id() ) ; for (int i = 0; i < SingleParallelModelAdm::Instance().totalMachinesCount(); i++) { this->send( initMsgNC, ParallelMainSimulator::Instance().nodeCoordinatorsList[i]) ; } return *this; } /******************************************************************* * Function Name: rootStop * Description: show the stop information * Jacky Note: this function is left here (maybe we want to show the stop * information when the simulation ends), but the stop of the * simulation is NOT controlled by the ParallelRoot ********************************************************************/ ParallelRoot &ParallelRoot::rootStop() { if (ShowVirtualTimeWhenFinish().Active()) cerr << "\n" << lastChange() << "\n"; return *this; } #ifndef JACKY_REVISION //this is the previous version ================================ /******************************************************************* * Function Name: receive(BasicOutputMessage*) * Description: send the time and value of the message to the environment ********************************************************************/ ParallelProcessor &ParallelRoot::receive( const BasicOutputMessage *msg ) { cerr << "[" << ParallelMainSimulator::Instance().getMachineID() << "] (Y) msg received in Root at time " << msg->time().asString() << endl; //If an output file has been opened if (createOutput) { string line = msg->time().asString() + " " + msg->port().name() + " " + msg->value()->asString() + "\n"; #ifdef KERNEL_TIMEWARP FileData *data = new FileData; data->time = msg->time(); data->length = line.length()+1; data->line = new char[line.length()+1]; line.copy( data->line, string::npos); data->line[line.length()] = 0; outFileQ[outputIndex].insert(data); #else (*outFileQ[outputIndex]) << line; #endif } delete msg; return *this; } #else //this is jacky's revision =========================================================== /******************************************************************* * Function Name: receive(BasicOutputMessage*) * Description: send the time and value of the message to the environment * Jacky Note: In the revision, the ParallelRoot need to find out to which * ports on the TOP model the received (Y) msg should be delivered. * Since the (Y) msg was sent by a ParallelSimulator and forwarded * to the ParallelRoot via FC and NC, the msg->port() is not necessarily * an output port on the TOP model. We need a function to find out * all the output ports on the TOP model that are the ultimate * destination of the received (Y) msg! ********************************************************************/ ParallelProcessor &ParallelRoot::receive( const BasicOutputMessage *msg ) { //cerr << "[" << ParallelMainSimulator::Instance().getMachineID() // << "] (Y) msg received in Root at time " << msg->time().asString() << endl; #ifdef JACKY_DEBUG ostream& jacky_os = JackyDebugStream::Instance().Stream(); #endif #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ------------------------------- //[2006-03-15] //If we are in CoastForward Phase, do nothing in this function! //We should not write to output files during coast forwarding since these lines have been written during the //original execution! if( getSuppressMessage() != 2 ){ //Not in Coast Forward Phase, do normal processing #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] ---------------------------------------- MASSERTMSG( getTopOutputPorts().size() == 0, "Root::receive(BasicOutputMessage*) -> Garbage in Root::topOutputPorts!" ); //If an output file has been opened if (createOutput) { //fill the topOutputPorts with all output ports on //the TOP model that this (Y) msg will be delivered findTopOutputPorts(msg->port()); MASSERTMSG( getTopOutputPorts().size() > 0, "Root::receive(BasicOutputMessage*) -> Wrong (Y) msg received!" ); #ifdef JACKY_DEBUG showTopOutputPorts(jacky_os); #endif //for each port on the TOP model that will receive this (Y) msg, we need to create a line InfluenceList::const_iterator cursor( getTopOutputPorts().begin() ) ; for( ; cursor != getTopOutputPorts().end(); cursor++ ) { string line = msg->time().asString() + " " + (*cursor)->name() + " " + msg->value()->asString() //#ifdef JACKY_DEBUG + "\t[DEBUG: from " + (msg->port()).name() + "@" + (msg->port()).model().asString() + "]" //#endif + "\n"; #ifdef JACKY_DEBUG jacky_os << "output => " << msg->time().asString() << " " << (*cursor)->name() << " " << msg->value()->asString() << "\t[DEBUG: from " << (msg->port()).name() << "@" << (msg->port()).model().asString() << "]" << endl << flush; #endif #ifdef KERNEL_TIMEWARP //-------------------------------------------------------------------------- FileData *data = new FileData; data->time = msg->time(); data->length = line.length()+1; data->line = new char[line.length()+1]; line.copy( data->line, string::npos); data->line[line.length()] = 0; outFileQ[outputIndex].insert(data); #else //notime kernel ------------------------------------------------------------------------------ (*outFileQ[outputIndex]) << line; #endif //end #ifdef KERNEL_TIMEWARP ---------------------------------------------------------------- } //end for loop //clean the topOutputPorts getTopOutputPorts().clear(); #ifdef JACKY_DEBUG jacky_os << "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^" << endl << flush; #endif } //end if(createOutput) #if defined(JACKY_INFREQ_STATEMANAGER) && defined(INFREQSTATEMANAGER) //[2006-03-15] ---------------------------- } //end getSuppressMessage() != 2 #ifdef JACKY_DEBUG //[2006-03-15] else { //if getSuppressMessage() == 2 //we are in CoastForward Operation, don't write to output files! jacky_os << endl << "[Note: Root in CoastForward -> No writing to output files!!!] " << endl << flush; } #endif #endif //end JACKY_INFREQ_STATEMANAGER && INFREQSTATEMANAGER //[2006-03-15] -------------------------------------- delete msg; return *this; } #endif //end JACKY_REVISION //================================================================ /*Jacky Note: the following functions are removed in the new version!*/ #ifndef JACKY_REVISION /******************************************************************* * Function Name: receive * Description: advance the time and go on with the simulation ********************************************************************/ ParallelProcessor &ParallelRoot::receive( const DoneMessage &msg ) { if ( lastChange() < msg.time() ){ cerr << "At " << msg.time().asString() << " Root lastChange() < msg.time() !" << endl; } cerr << "Root receives (D) msg at " << msg.time().asString() << " / nextChange()=" << msg.nextChange().asString() << endl; //We have to find out if the DoneMessage corresponds to the //a response to a CollectMessage or to a InternalMessage lastChange( msg.time() ); nextChange( msg.nextChange() ); //If we have to send an Internal Message, send it now! if ( sendMsgType() == ParallelRootState::InternalMsg ) { //It should be the case that absoluteNext == msg.time(); InternalMessage intMsg( absoluteNext(), id()); send ( intMsg, topMasterId ); //Calculate GVT //Set the next message type to Collect sendMsgType ( ParallelRootState::CollectMsg ); } else {//See if there are any external events to send before the collect if( endOfEvents() && nextChange()==VTime::Inf ) { rootStop() ; } else { // is there any event? Event ev( VTime::Inf ) ; if( !endOfEvents() ) ev = *(currentEvent()) ; //Jacky: now, if there are external events, ev is the current event (with MIN time among all events); //if there is no external event, ev is a dummy Event with time=INF, port=NULL, value=0 VTime nt( absoluteNext() ); //If there is an external event prior to an /at the same time as //an Internal transition then send the external messages //before sending the CollectMessage if( ev.time < absoluteNext() ) { nt = ev.time; } //if //Jacky: thus, nt is MIN(nextExternalEventTime, nextStateChangeTime) //Should we go on? if ( stopTime() < nt ) { rootStop() ; } else {//Jacky: either there is an external event to be processed before stoptime, // or there is an internal state change before stop time //Send all the external msgs. while ( !endOfEvents() && ev.time == nt ) { ExternalMessage extMsg( ev.time, id(), *ev.port, ev.value.value() ); send( extMsg, topMasterId ); eventsMoveNext() ; if ( !endOfEvents() ) ev = *(currentEvent()) ; } //while //If we have also an internal transition, then send the Collect message if ( nt == absoluteNext() ) { CollectMessage collectMsg = CollectMessage( nt, id() ) ; //Set the next message type to Internal sendMsgType ( ParallelRootState::InternalMsg ); send ( collectMsg, topMasterId ); } else { //Set the time for next change to now. lastChange( nt ); nextChange( VTime::Zero ); InternalMessage intMsg( absoluteNext(), id()); send ( intMsg, topMasterId ); //Set the next message type to Collect sendMsgType ( ParallelRootState::CollectMsg ); } }//if }//if stop }//if sendMsgType == Internal return *this; } /******************************************************************* * Function Name: addExternalEvent * Description: add external event ********************************************************************/ ParallelRoot &ParallelRoot::addExternalEvent( const VTime &t, const Port &p, const Real &r ) { externalEvents.push_back( Event(t, &p, r) ); return *this; } #endif //end #ifndef JACKY_REVISION