/******************************************************************* * * DESCRIPTION: class Coordinator * * AUTHOR: Amir Barylko & Jorge Beyoglonian * VERSION 2: Daniel A. Rodriguez * * EMAIL: mailto://amir@dc.uba.ar * mailto://jbeyoglo@dc.uba.ar * mailto://drodrigu@dc.uba.ar * * DATE: 27/6/1998 * DATE: 4/6/1999 (v2) * *******************************************************************/ /** include files **/ #include "coordin.h" // header #include "msgadm.h" // SingleMsgAdmin #include "coupled.h" // class Coupled #include "procadm.h" #include "message.h" #include "msgbag.h" /******************************************************************* * Function Name: Coordinator ********************************************************************/ Coordinator::Coordinator( Coupled *coupled ) : Processor( coupled ) , receivedX( 1 ) , inminent( Processor::InvalidId ) { doneCount(0); Coupled &coupled = static_cast< Coupled & >( model() ); Coupled::ModelList::const_iterator cursor; for ( cursor = coupled.children().begin(); cursor != coupled.children().end(); cursor++) { dependents[ (*cursor) ] = Time::Inf; } } /* Coordinator::Initialize() { doneCount(0); Coupled & coupled = static_cast< Coupled & > (model()); Coupled::ModelList::const_iterator cursor; // Rami: This is assumes that the modelid is the same as the process id associated with the model for( cursor = coupled.children().begin(); cursor != coupled.children().end(); cursor++) { dependents[ (*cursor)->id() ] = Time::Inf; } } */ /******************************************************************* * Function Name: receive * Description: forward the init message to his childrens ********************************************************************/ Processor &Coordinator::receive( const InitMessage &msg ) { Coupled &coupled( static_cast( model() ) ); //Rami doneCount( coupled.children().size() ); Coupled::ModelList::const_iterator cursor; InitMessage init( msg.time(), this->Processor::id() ); for( cursor = coupled.children().begin() ; cursor != coupled.children().end() ; cursor++ ) SingleMsgAdm::Instance().send( init , *cursor ); return *this; } /******************************************************************* * Function Name: receive * Description: sends a X message to the port's influences ********************************************************************/ /* Rami Processor &Coordinator::receive( const ExternalMessage &msg ) { const InfluenceList &influList( msg.port().influences() ) ; InfluenceList::const_iterator cursor( influList.begin() ) ; // Old Code ////////////////////////////////////////////////////////// //MASSERT( !doneCount ); //doneCount = influList.size() ; // No permite eventos externos consecutivos ////////////////////////////////////////////////////////////////////// // New Code ////////////////////////////////////////////////////////// //Rami if (doneCount() == 0) receivedX = 1; else receivedX++; doneCount(doneCount() + influList.size()) ; ////////////////////////////////////////////////////////////////////// if( doneCount() > 0 ) { ExternalMessage xMsg( msg ) ; xMsg.procId( this->Processor::id() ) ; for( ; cursor != influList.end(); cursor++ ) { xMsg.port( *(*cursor) ) ; SingleMsgAdm::Instance().send( xMsg, (*cursor)->model() ) ; } } else { lastChange( msg.time() ) ; if( inminentChild() != Processor::InvalidId ) { nextChange( model().absoluteNext() - msg.time() ) ; Processor &proc( SingleProcessorAdmin::Instance().processor( inminentChild() ) ) ; nextChange( proc.model().absoluteNext() - msg.time() ) ; } DoneMessage doneMsg( msg.time(), this->Processor::id(), nextChange() ) ; SingleMsgAdm::Instance().send( doneMsg, model().parentId() ) ; } return *this ; } */ //Rami Processor &Coordinator::receive(const CollectMessage &msg) { MASSERT( msg.time() == absoluteNext() ); lastChange( msg.time() ) ; nextChange( Time::Zero ) ; Coordinator::DependentList::const_iterator cursor; CollectMessage collect( msg.time(), this->Processor::id() ); for ( cursor = dependents.begin() ; cursor != dependents.end() ; cursor++) { if (cursor->second == msg.time()) { synchronizeList.insert(cursor->first); doneCount(doneCount() + 1); SingleMsgAdm::Instance().send(collect, cursor->first); } } return *this; } Coordinator &Coordinator::sortExternalMessage(const ExternalMessage &msg) { const InfluenceList &influList(msg.port().influences() ); InfluenceList::const_iterator cursor(influList.begin()); ExternalMessage xMsg(msg); xMsg.procId( this->Processor::id() ); for (; cursor != influList.end() ; cursor++) { synchronizeList.insert( (*cursor)->model()); xMsg.port(*(*cursor)); SingleMsgAdm::Instance().send(xMsg, (*cursor)->model() ); } return *this; } Coordinator &Coordinator::sortOutputMessage(const OutputMessage &msg) { Coupled &coupled( static_cast< Coupled & >( model() ) ); const InfluenceList &influList(msg.port().influences() ); InfluenceList::const_iterator cursor( influList.begin() ); OutputMessage outMsg( msg ); outMsg.procId( this->Processor::id() ); ExternalMessage extMsg; extMsg.time( msg.time() ); extMsg.procId( this->Processor::id() ); extMsg.value( msg.value() ); for (; cursor != influList.end() ; cursor++) { if (coupled.outputPorts().find( (*cursor)->id() ) == coupled.outputPorts().end() ) { synchronizeList.insert((*cursor)->model()); extMsg.port( *(*cursor) ); SingleMsgAdm::Instance().send(extMsg, (*cursor)->model()); } else { outMsg.port( *(*cursor) ); SingleMsgAdm::Instance().send(outMsg, coupled.parentId() ); } return *this; } /******************************************************************* * Function Name: receive * Description: sends the * message to the inminent child ********************************************************************/ Processor &Coordinator::receive( const InternalMessage &msg ) { MASSERTMSG( doneCount() == 0 , "Received an InternalMessage and doneCount is not zero!"); for (MessageBag::iterator extMsgs = externalMsgs.begin(); extMsgs != externalMsgs.end(); extMsgs++) { sortExternalMessage( *( (ExteranlMessage*)(* extMsgs) ) ); } externalMsgs.eraseAll(); ProcSet::const_iterator cursor; InternalMessage internal(msg.time(), this->Processor::id()); for( cursor = synchronizeList.begin() ; cursor != synchronizeList.end() ; cursor++) { doneCount( doneCount() + 1); SingleMsgAdm::Instance().send(internal, *cursor); } synchronizeList.erase(synchronizeList.begin(), synchronizeList.end()); return *this; /* doneCount(1) ; InternalMessage internal( msg.time(), this->Processor::id() ) ; SingleMsgAdm::Instance().send( internal, inminentChild() ) ; return *this ; */ } /******************************************************************* * Function Name: receive * Description: translates the output event to a X messages for the * influenced children and to a Y message for his father ********************************************************************/ Processor &Coordinator::receive( const OutputMessage &msg ) { sortOutputMessage( msg); return *this; /* Coupled &coupled( static_cast< Coupled & >( model() ) ); const InfluenceList &influList( msg.port().influences() ); InfluenceList::const_iterator cursor( influList.begin() ); OutputMessage outMsg( msg ); outMsg.procId( this->Processor::id() ); ExternalMessage extMsg; extMsg.time( msg.time() ); extMsg.procId( Processor::id() ); extMsg.value( msg.value() ); for( ; cursor != influList.end(); cursor++ ) { // not found in the output list => send it as an external message if( coupled.outputPorts().find( (*cursor)->id() ) == coupled.outputPorts().end() ) { doneCount(doneCount() + 1) ; extMsg.port( * (*cursor) ); SingleMsgAdm::Instance().send( extMsg, (*cursor)->model() ); } else { outMsg.port( * (*cursor) ); SingleMsgAdm::Instance().send( outMsg, coupled.parentId() ); } } return *this; */ } /******************************************************************* * Function Name: receive * Description: recalculates the inminent child and sends it to his * father inside a done message ********************************************************************/ Processor &Coordinator::receive( const DoneMessage &msg ) { MASSERTMSG( doneCount() > 0 , "Unexpected Done Message!"); Coupled &coupled = static_cast< Coupled & >( model() ); doneCount( doneCount() - 1); // dependents[msg.id()] = msg.time() + msg.nextChange(); dependents[ SingleProcessorAdmin::Instance().processor(msg.id()).model().id() ] = msg.time() + msg.nextChange(); if( doneCount() == 0) { lastChange (msg.time()); nextChange( calculateNextChange( msg.time() ) ); DoneMessage doneMsg (msg.time(), this->Processor::id() , nextChange()); SingleMsgAdm::Instance().send(doneMsg, coupled.parentId()); } return *this; /* Coupled &coupled( static_cast< Coupled & >( model() ) ); MASSERTMSG( doneCount() > 0, "Unexpected Done message!" ); if( ! --donecount ) { lastChange( msg.time() ); recalcInminentChild(); if( inminentChild() == Processor::InvalidId ) nextChange( Time::Inf ); else { Processor &proc = SingleProcessorAdmin::Instance().processor( inminentChild() ); nextChange( proc.model().absoluteNext() - msg.time() ); } DoneMessage doneMsg( msg.time(), this->Processor::id(), coupled.nextChange() ); for (long i = 1; i <= receivedX; i++) SingleMsgAdm::Instance().send( doneMsg, coupled.parentId() ); receivedX = 1; } return *this; */ } /******************************************************************* * Function Name: id ********************************************************************/ Processor &Coordinator::id( const ProcId &pid ) { this->Processor::id( pid ); MASSERT( pmodel() ); pmodel()->id( pid ); return *this; } /******************************************************************* * Function Name: recalcInminentChild ********************************************************************/ Coordinator &Coordinator::recalcInminentChild() { Coupled &coupled( static_cast< Coupled & >( model() ) ); Coupled::ModelList::const_iterator cursor( coupled.children().begin() ) ; Time min( Time::Inf ); inminentChild( Processor::InvalidId ); for( ; cursor != coupled.children().end() ; cursor++ ) { Model &model( SingleProcessorAdmin::Instance().processor( *(cursor) ).model() ) ; if( min > model.absoluteNext() ) { min = model.absoluteNext(); inminentChild( *(cursor) ); } } return *this; } Time &Coordinator::calculateNextChange(const Time& time) const { Time next(Time::Inf); Coordinator::DependentList::const_iterator; for ( cursor = dependents.begin(); cursor != dependents.end() ; cursor++) { if(cursor->second < next) next = cursor->second; } next -= time; }