#include "scoordin.h" #include "coupled.h" #include "model.h" #include "msgadm.h" SCoordinator::SCoordinator(Coupled * coupled) : Coordinator(coupled) { parentId = coupled->masterId(); } Processor &SCoordinator::receive( const InitMessage &msg) { doneCount( dependents.size() ); DependentList::const_iterator cursor; cursor = dependents.begin(); InitMessage initMsg( msg.time(), this->Processor::id() ); for ( ; cursor != dependents.end() ; cursor++) SingleMsgAdm::Instance().send(initMsg, cursor->first) ; return *this; } Processor &SCoordinator::receive( const CollectMessage &msg) { //cout << "Scoordinator::receive before ASSERTION " << endl; // cout << "msg.time() : " << msg.time().asString() << " , absoluteNext() : " << absoluteNext() << endl; MASSERT( msg.time() == absoluteNext() ); // cout << "Scoordinator::receive after ASSERTION " << endl; lastChange( msg.time() ); nextChange( Time::Zero ); DependentList::const_iterator cursor; cursor = dependents.begin(); CollectMessage collectMsg(msg.time() , this->Processor::id() ); for ( ; cursor != dependents.end() ; cursor++) { if ( cursor->second == msg.time() ) { synchronizeList.insert(cursor->first); doneCount( doneCount() + 1 ); SingleMsgAdm::Instance().send( collectMsg, cursor->first ); } } return *this; } Processor &SCoordinator::sortExternalMessage(const ExternalMessage &msg) { const InfluenceList &influList(msg.port().influences() ); InfluenceList::const_iterator cursor; cursor = influList.begin(); ExternalMessage extMsg(msg); extMsg.procId(this->Processor::id() ); for( ; cursor != influList.end() ; cursor++) { if( (*cursor)->model().isLocalMaster() ) { synchronizeList.insert( (*cursor)->model().localProc() ); // doneCount( doneCount() + 1 ); extMsg.port( *(*cursor) ); SingleMsgAdm::Instance().send( extMsg, (*cursor)->model().localProc() ); } } return *this; } Processor &SCoordinator::receive(const DoneMessage &msg) { MASSERTMSG( doneCount() > 0 , "Unexpected Done Message!"); doneCount( doneCount() - 1 ); dependents[ msg.procId() ] = msg.time() + msg.nextChange(); if( doneCount() == 0 ) { lastChange( msg.time() ); nextChange( calculateNextChange( msg.time() ) ); DoneMessage doneMsg( msg.time(), this->Processor::id() , nextChange() , true ); SingleMsgAdm::Instance().send(doneMsg , parentId ); } return *this; } Processor &SCoordinator::sortOutputMessage(const OutputMessage &msg) { bool sentToMaster = false; Coupled &coupled = static_cast( model() ); const InfluenceList &influList = msg.port().influences(); InfluenceList::const_iterator cursor; cursor = influList.begin(); OutputMessage outMsg(msg); outMsg.procId( this->Processor::id() ); ExternalMessage extMsg; extMsg.time( msg.time() ); extMsg.procId( this->Processor::id() ); extMsg.value( msg.value() ); extMsg.senderModelId( msg.port().modelId() ); for( ; cursor != influList.end() ; cursor++ ) { if( coupled.outputPorts().find( (*cursor)->id() ) == coupled.outputPorts().end() ) { if( (*cursor)->model().isLocalMaster() ) { synchronizeList.insert( (*cursor)->model().localProc() ); extMsg.port( *(*cursor) ); SingleMsgAdm::Instance().send( extMsg, (*cursor)->model().localProc() ); } else { if (!sentToMaster) { SingleMsgAdm::Instance().send(outMsg, parentId); sentToMaster = true; } } } else { if (!sentToMaster) { SingleMsgAdm::Instance().send(outMsg, parentId); sentToMaster = true; } } } return *this; } Time &SCoordinator::calculateNextChange(const Time &time) const { Time next(Time::Inf) ; DependentList::const_iterator cursor ; cursor = dependents.begin(); for( ; cursor != dependents.end() ; cursor++ ) if( (cursor->second) < next ) next = cursor->second; next -= time; return next; }