/******************************************************************* * * DESCRIPTION: class ParallelMCoordinator2 * * AUTHOR: Alejandro Troccoli * * EMAIL: mailto://atroccol@dc.uba.ar * * DATE: 16/11/2000 * *******************************************************************/ /** include files **/ #include "pmcoordin2.h" // header #include "coupled.h" // class Coupled #include "pprocadm.h" #include "pmessage.h" #include "parsimu.h" //class ParallelMainSimulator /******************************************************************* * Function Name: Constructor ********************************************************************/ ParallelMCoordinator2::ParallelMCoordinator2( Coupled * coupled ) : ParallelCoordinator(coupled) { //Processor is master, then parent is parent model's processor. parentId = coupled->parentId(); } /******************************************************************* * Function Name: initialize ********************************************************************/ void ParallelMCoordinator2::initialize() { ParallelCoordinator::initialize(); //Add the slave's processor list to the dependants list Coupled& coupled = static_cast (model()); for(ModelPartition::const_iterator mcursor = coupled.modelPartition().begin(); mcursor != coupled.modelPartition().end(); mcursor++) { if ( id() != mcursor->second ) //Add processor id to the slave list. slaves() [mcursor->second] = VTime::Inf ; } ParallelMainSimulator::Instance().debugStream() << "OK" << endl << flush; } /******************************************************************* * Function Name: receive * Description: forward the init message to all dependants and slaves ********************************************************************/ ParallelProcessor &ParallelMCoordinator2::receive( const InitMessage &msg ) { //cout << "In function ParallelMCoordinator2::receive(InitMessage)"<send( init , cursor->first ); //Now, send to dependants. for( cursor = dependants().begin() ; cursor != dependants().end() ; cursor++ ) this->send( init , cursor->first ); return *this; } /******************************************************************* * Function Name: receive * Description: sort the external messages and send the @ message to the inminent childs ********************************************************************/ ParallelProcessor &ParallelMCoordinator2::receive( const CollectMessage &msg ) { //cout << "In function ParallelCoordinator::receive(CollectMessage)"<first ); } for( cursor = dependants().begin() ; cursor != dependants().end() ; cursor++ ) { if ( cursor->second == msg.time() ) { synchronizeList.insert( cursor->first ); doneCount( doneCount() + 1); send( collect , cursor->first ); }//if } return *this; } /******************************************************************* * Function Name: receive * Description: sends the * message to the inminent child ********************************************************************/ ParallelProcessor &ParallelMCoordinator2::receive( const InternalMessage &msg ) { MASSERTMSG( doneCount() == 0, "Received an InternalMessage and doneCount is not cero!" ); //1. Sort the external message queue for( MessageBag::iterator extMsgs = externalMsgs.begin(); extMsgs != externalMsgs.end(); extMsgs++ ) { sortExternalMessage( *((BasicExternalMessage*) (*extMsgs))); } //2. Empty queue externalMsgs.eraseAll(); //3. Send all the internal messages InternalMessage internal( msg.time(), id() ) ; ParallelCoordinatorState::DependantList::const_iterator slavesCursor; //First, send message to ALL slave coordinators for( slavesCursor = slaves().begin() ; slavesCursor != slaves().end() ; slavesCursor++ ) { doneCount( doneCount() + 1); send( internal , slavesCursor->first ); } //Now, send to the imminent dependants. ProcSet::const_iterator cursor; for( cursor = synchronizeList.begin(); cursor != synchronizeList.end() ; cursor++ ) { doneCount( doneCount() + 1); send( internal, *cursor ) ; } //Clear the synchronize set synchronizeList.erase( synchronizeList.begin(), synchronizeList.end()); return *this; } /******************************************************************* * Function Name: receive * Description: sends an X message to the port's influences ********************************************************************/ ParallelCoordinator &ParallelMCoordinator2::sortExternalMessage( const BasicExternalMessage &msg ) { //cout << "In function ParallelMCoordinator2::receive(ExternalMessage)"<model().isLocalMaster()) { synchronizeList.insert( (*cursor)->model().localProc() ); xMsg.port( *(*cursor) ) ; send( xMsg,(*cursor)->model().localProc() ) ; } else if( msg.procId() == parentId ){ //When forwarding a message, do not translate the port const ProcId& slaveId = (*cursor)->model().parentId(); if (procs.find( slaveId ) == procs.end() ) { //synchronizeList.insert( slaveId ); procs.insert(slaveId); xMsg.port( msg.port()); send( xMsg, slaveId ); } // if }//else }//for return *this ; } /******************************************************************* * Function Name: receive * Description: translates the output event to a X messages for the * influenced children and to a Y message for his father ********************************************************************/ ParallelCoordinator &ParallelMCoordinator2::sortOutputMessage( const BasicOutputMessage &msg ) { //cout << "In function ParallelMCoordinator2::receive(OutputMessage)"<( model() ) ); ProcSet procs; //Remote procs that have already received an external msg. const InfluenceList &influList( msg.port().influences() ); InfluenceList::const_iterator cursor( influList.begin() ); BasicOutputMessage outMsg( msg ); outMsg.procId( id() ); BasicExternalMessage extMsg; extMsg.time( msg.time() ); extMsg.procId( id() ); extMsg.value( msg.value()->clone() ); extMsg.senderModelId( msg.port().modelId() ); for( ; cursor != influList.end(); cursor++ ) { // Search for the port in the ouput ports list. // If found, send message to parent model. // If not found in the output ports list, then it must be translated to an external message // If so, check if the recipient has a local master processor. // If the recipient does not have a local master processor, find the // corresponding slave processor and send the message. if( coupled.outputPorts().find( (*cursor)->id() ) == coupled.outputPorts().end() ) { if ( (*cursor)->model().isLocalMaster()) { synchronizeList.insert((*cursor)->model().localProc()); extMsg.port( * (*cursor) ); send( extMsg, (*cursor)->model().localProc()); } else if( !isFromSlave ) { //When forwarding a message, do not translate the port //const ProcId& slaveId = parentSlaveForModel( (*cursor)->model() ); const ProcId& slaveId = (*cursor)->model().parentId(); //Two conditions must be satisfied for a message to be forwarded //to a slave processor: //1. The slave should not be the sender of the output message //2. The slave should not have already been sent the message if (slaveId != msg.procId() && procs.find( slaveId ) == procs.end() ) { //synchronizeList.insert( slaveId ); procs.insert(slaveId); extMsg.port( msg.port()); send( extMsg, slaveId ); } // if } } else { outMsg.port( * (*cursor) ); send( outMsg, parentId ); } }//for return *this; } /******************************************************************* * Function Name: receive * Description: recalculates the inminent child ********************************************************************/ ParallelProcessor &ParallelMCoordinator2::receive( const DoneMessage &msg ) { //cout << "In function ParallelCoordinator::receive(DoneMessage)"< 0, "Unexpected Done message!" ); doneCount( doneCount() - 1); //Update the depdendant's absolute next time! if (msg.isFromSlave()) { slaves()[msg.procId()] = msg.time() + msg.nextChange(); } else { dependants()[msg.procId()] = msg.time() + msg.nextChange(); } if( doneCount() == 0 ) { lastChange ( msg.time() ); nextChange( calculateNextChange( msg.time() ) ); DoneMessage doneMsg( msg.time(), id(), nextChange(), false ); send( doneMsg, parentId ); } return *this; } /******************************************************************* * Function Name: calculateNextChange * Description: calculate the time for the next change ********************************************************************/ VTime ParallelMCoordinator2::calculateNextChange(const VTime& time) const { VTime next(VTime::Inf); ParallelCoordinatorState::DependantList::const_iterator cursor; //Check slaves for( cursor = slaves().begin() ; cursor != slaves().end() ; cursor++ ) if( cursor->second < next) next = cursor->second; //Check dependants. for( cursor = dependants().begin() ; cursor != dependants().end() ; cursor++ ) if( cursor->second < next) next = cursor->second; next -= time; return next; } /******************************************************************* * Function Name: allocateState ********************************************************************/ BasicState* ParallelMCoordinator2::allocateState(){ ParallelMCoordinatorState *state = new ParallelMCoordinatorState; state->dependants = new ParallelCoordinatorState::DependantList; state->slaves = new ParallelCoordinatorState::DependantList; return state; }