/******************************************************************* * * DESCRIPTION: class Coordinator * * AUTHOR: Amir Barylko & Jorge Beyoglonian * VERSION 2: Daniel A. Rodriguez * * EMAIL: mailto://amir@dc.uba.ar * mailto://jbeyoglo@dc.uba.ar * mailto://drodrigu@dc.uba.ar * * DATE: 27/6/1998 * DATE: 4/6/1999 (v2) * *******************************************************************/ /** include files **/ #include "coordin.h" // header #include "msgadm.h" // SingleMsgAdmin #include "coupled.h" // class Coupled #include "procadm.h" /******************************************************************* * Function Name: Coordinator * Desciption: initializes donecount and dependents list ********************************************************************/ //Rami // , receivedX( 1 ) // , inminent( Processor::InvalidId ) Coordinator::Coordinator( Coupled *coupled ) : Processor( coupled ) { this->Processor::id( coupled->localProc() ); doneCount(0); Coupled::DModelList::const_iterator cursor; cursor = coupled->childModels().begin(); for ( ; cursor != coupled->childModels().end() ; cursor++) { if ( (*cursor)->isLocalMaster()) dependents[ (*cursor)->localProc() ] = Time::Inf; } } /******************************************************************* * Function Name: receive * Description: forward the init message to his childrens * ********************************************************************/ ///////////////////////////////////////////////Rami////////////////////////////////////////// /////////// This will be overriden by the MasterCoordinator and SlaveCoordinator /////////// /* Processor &Coordinator::receive( const InitMessage &msg ) { doneCount( dependents.size() ); InitMessage init( msg.time() , this->Processor::id() ); Coordinator::DependentList::const_iterator cursor; cursor = dependents.begin(); for (; cursor != dependents.end() ; cursor++) SingleMsgAdm::Instance().send( init , cursor->first ); return *this; } */ ///////////////////////////////////////////////////////////////////////////////////////////// /***************************************************************************************** * Function Name : sortExternalMessages(const ExternalMessage &msg) * Description : sends the messages in the bag to their destinations and inserts them * in the synchronizeSet (so that they receive * messages and execute their externalFunctions *****************************************************************************************/ //////////////////////////////////////////////Rami//////////////////////////////////////////// ////////////////////////// This will be overriden by the Master and Slave coordinator//////// /* Coordinator &Coordinator::sortExternalMessage(const ExternalMessage &msg) { const InfluenceList &influList( msg.port().influences() ); InfluenceList::const_iterator cursor( influList.begin() ); ExternalMessage xMsg(msg); xMsg.procId( this->Processor::id() ); for (; cursor != influList.end() ; cursor++) { if ( ((*cursor)->model()).isLocalMaster() ) { synchronizeList.insert( ((*cursor)->model()).localProc() ) ; xMsg.port(*(*cursor)); SingleMsgAdm::Instance().send(xMsg, ((*cursor)->model()).localProc() ); } } return *this; } */ ////////////////////////////////////////////////////////////////////////////////////////////// /**************************************************************************************** Function Name : sortOutputMessage(const OutputMessage &msg) Description : maps output messages of children into external message to other children (and insert those children in synchronizeList) or pass them as output to the parent model ****************************************************************************************/ ////////////////////////////////////////////////////////////////////////////////////////////// ////////////////Rami: This will be overriden by the Master and Slave Coordinators //////////// /* Coordinator &Coordinator::sortOutputMessage(const OutputMessage &msg) { Coupled &coupled( static_cast< Coupled & >( model() ) ); const InfluenceList &influList(msg.port().influences() ); InfluenceList::const_iterator cursor( influList.begin() ); OutputMessage outMsg( msg ); outMsg.procId( this->Processor::id() ); ExternalMessage extMsg; extMsg.time( msg.time() ); extMsg.procId( this->Processor::id() ); extMsg.value( msg.value() ); ModelId senderId = msg.port().modelId(); extMsg.senderModelId( senderId ); cout << "in sortOutputMessage senderID is " << extMsg.senderModelId() << endl; for (; cursor != influList.end() ; cursor++) { if (coupled.outputPorts().find( (*cursor)->id() ) == coupled.outputPorts().end() ) { if ( ((*cursor)->model()).isLocalMaster() ) { synchronizeList.insert( ((*cursor)->model()).localProc() ) ; extMsg.port( *(*cursor) ); SingleMsgAdm::Instance().send( extMsg , ((*cursor)->model()).localProc() ); } } else { outMsg.port( *(*cursor) ); SingleMsgAdm::Instance().send(outMsg, coupled.parentId() ); } } return *this; } */ ////////////////////////////////////////////////////////////////////////////////////////////// /*************************************************************************************** * Function Name: receive(const CollectMessage &msg) * Description: sends collect messages to imninent children and insert them into * synchronizeList (so that they receive * message and execute their internal tranisition) ***************************************************************************************/ /////////////////////////////////////////////Rami///////////////////////////////////////////// ///////////////////////////This will be overriden by the master and slave coordinators//////// /* Processor &Coordinator::receive(const CollectMessage &msg) { MASSERT( msg.time() == absoluteNext() ); lastChange( msg.time() ) ; nextChange( Time::Zero ) ; Coordinator::DependentList::const_iterator cursor; CollectMessage collect( msg.time(), this->Processor::id() ); for ( cursor = dependents.begin() ; cursor != dependents.end() ; cursor++) { if (cursor->second == msg.time()) { synchronizeList.insert(cursor->first); doneCount(doneCount() + 1); SingleMsgAdm::Instance().send(collect, cursor->first); } } return *this; } */ ////////////////////////////////////////////////////////////////////////////////////////////// /******************************************************************* * Function Name: receive(const InternalMessage &msg) * Description: removes the messages from the message bag and forward them using * sortExternalMessage function. Send * messages to models in synchronizeList ********************************************************************/ Processor &Coordinator::receive( const InternalMessage &msg ) { MASSERTMSG( doneCount() == 0 , "Received an InternalMessage and doneCount is not zero!"); for (MessageBag::iterator extMsgs = externalMsgs.begin(); extMsgs != externalMsgs.end(); extMsgs++) { sortExternalMessage( *( (ExternalMessage*)(*extMsgs) ) ); } externalMsgs.eraseAll(); ProcSet::const_iterator cursor; InternalMessage internal(msg.time(), this->Processor::id()); // Rami: synchronizeList will have the modelIds of the models which : // 1) Received an external message, through sortExternalMessage function // 2) Received an external message, through sortOutputMessage function // 3) Scheduled for an internal transition, through receive(CollectMessage) function for( cursor = synchronizeList.begin() ; cursor != synchronizeList.end() ; cursor++) { doneCount( doneCount() + 1); SingleMsgAdm::Instance().send(internal, *cursor); } synchronizeList.erase(synchronizeList.begin(), synchronizeList.end()); // cout << "Coordinator(" << this->description() << ") done count is : " << doneCount() << " at " << lastChange() << endl; return *this; } /**************************************************************************************** * Function Name: receive (const OutputMessage &msg) * Description: routes output messages using sortOutputMessage() function ****************************************************************************************/ Processor &Coordinator::receive( const OutputMessage &msg ) { sortOutputMessage(msg); return *this; } Processor &Coordinator::receive( const InitMessage &msg) { InvalidMessageException e; e.addText("The Coordinator class can NOT receive initMessage, it has to be received bt M/S Coordinator" ) ; MTHROW(e); return *this; } Processor &Coordinator::receive( const CollectMessage &msg) { InvalidMessageException e; e.addText("The Coordinator class can NOT receive CollectMessages, it has to be received by M/S Coordinator"); MTHROW(e); } Processor &Coordinator::receive( const DoneMessage &msg) { InvalidMessageException e; e.addText("The Coordinator class can NOT receive DoneMessages , it has to be received by M/S Coordinator"); MTHROW(e); } Processor &Coordinator::sortExternalMessage(const ExternalMessage &msg) { InvalidMessageException e; e.addText( "The Coordinator class can NOT sort External Messages, this has to be done by a master coordinator or slave coordinator" ) ; MTHROW(e); } Processor &Coordinator::sortOutputMessage(const OutputMessage &msg) { InvalidMessageException e; e.addText("The coordinator class can NOT sort Output Messages, this has to be done by a master coordinator or slave coordinator" ); MTHROW(e); } /***************************************************************************************** * Function Name: receive (const DoneMessage &msg) * Description: receive Done messages fromd dependents and updates the dependents list * with the next change time for each dependent ******************************************************************************************/ ////////////////////////////////////////////Rami///////////////////////////////////////////// /////////////////This will be overriden by the master and slave coordinators///////////////// /* Processor &Coordinator::receive( const DoneMessage &msg ) { MASSERTMSG( doneCount() > 0 , "Unexpected Done Message!"); Coupled &coupled = static_cast< Coupled & >( model() ); doneCount( doneCount() - 1); //Rami // dependents[ SingleProcessorAdmin::Instance().processor(msg.procId()).model().id() ] = msg.time() + msg.nextChange(); dependents[ msg.procId() ] = msg.time() + msg.nextChange() ; if( doneCount() == 0) { lastChange (msg.time()); nextChange( calculateNextChange( msg.time() ) ); DoneMessage doneMsg (msg.time(), this->Processor::id() , nextChange()); SingleMsgAdm::Instance().send(doneMsg, coupled.parentId()); } return *this; } */ ////////////////////////////////////////////////////////////////////////////////////////////// /******************************************************************* * Function Name: id ********************************************************************/ //Rami /*Processor &Coordinator::id( const ProcId &pid ) { this->Processor::id( pid ); MASSERT( pmodel() ); pmodel()->id( pid ); return *this; } */ /**************************************************************************************** Function Name: calculateNextChange(const Time& time) const Description : calculate the next change period of the imninent dependents ****************************************************************************************/ Time &Coordinator::calculateNextChange(const Time& time) const { InvalidMessageException e; e.addText("The class Coordinator can NOT execute calculateNextChange"); MTHROW(e); /* Time next(Time::Inf); Coordinator::DependentList::const_iterator cursor; for ( cursor = dependents.begin(); cursor != dependents.end() ; cursor++) { if(cursor->second < next) next = cursor->second; } next -= time; */ }