#include "mcoordin.h" #include "coupled.h" #include "msgadm.h" #include "model.h" MCoordinator::MCoordinator(Coupled * coupled) : Coordinator( coupled ) { parentId = coupled->parentId(); // Coupled &coupled = static_cast ( model() ); ModelPartition::const_iterator cursor ; cursor = coupled->modelPartition().begin(); for (; cursor != coupled->modelPartition().end() ; cursor++) { if ( this->Processor::id() != cursor->second ) slaves [ cursor->second ] = Time::Inf; } } Processor &MCoordinator::receive( const InitMessage &msg) { doneCount( dependents.size() + slaves.size() ); InitMessage init( msg.time() , this->Processor::id() ); DependentList::const_iterator cursor ; for( cursor = slaves.begin() ; cursor != slaves.end() ; cursor++ ) { SingleMsgAdm::Instance().send( init , cursor->first ); } for ( cursor = dependents.begin() ; cursor != dependents.end() ; cursor++ ) { SingleMsgAdm::Instance().send( init , cursor->first ); } return *this; } Processor &MCoordinator::receive(const CollectMessage &msg ) { MASSERT( msg.time() == absoluteNext() ); lastChange( msg.time() ); nextChange( Time::Zero ) ; DependentList::const_iterator cursor; CollectMessage collectMsg( msg.time() , this->Processor::id() ) ; for( cursor = dependents.begin() ; cursor != dependents.end() ; cursor++ ) { if (cursor->second == msg.time() ) { synchronizeList.insert(cursor->first); doneCount( doneCount() + 1 ); SingleMsgAdm::Instance().send( collectMsg , cursor->first ); } } for( cursor = slaves.begin() ; cursor != slaves.end() ; cursor++ ) { if (cursor->second == msg.time() ) { synchronizeList.insert(cursor->first); doneCount( doneCount() + 1 ); SingleMsgAdm::Instance().send( collectMsg , cursor->first ); } } return *this; } Processor &MCoordinator::sortExternalMessage(const ExternalMessage &msg) { ProcSet procs; const InfluenceList &influList( msg.port().influences() ); InfluenceList::const_iterator cursor; cursor = influList.begin(); ExternalMessage extMsg(msg); extMsg.procId( this->Processor::id() ) ; for( ; cursor != influList.end() ; cursor++ ) { if( (*cursor)->model().isLocalMaster() ) { synchronizeList.insert( (*cursor)->model().localProc() ); extMsg.port( *(*cursor) ); SingleMsgAdm::Instance().send( extMsg , (*cursor)->model().localProc() ); } else { const ProcId &slaveId = (*cursor)->model().parentId(); if( procs.find(slaveId) == procs.end() ) { synchronizeList.insert( slaveId ); procs.insert( slaveId ); extMsg.port( msg.port() ); SingleMsgAdm::Instance().send( extMsg, slaveId ); } } } return *this; } Processor &MCoordinator::sortOutputMessage( const OutputMessage &msg) { ProcSet procs; Coupled & coupled = static_cast( model() ) ; const InfluenceList &influList = msg.port().influences(); InfluenceList::const_iterator cursor; cursor = influList.begin(); OutputMessage outMsg(msg); outMsg.procId( this->Processor::id() ); ExternalMessage extMsg; extMsg.time( msg.time() ); extMsg.procId( this->Processor::id() ); extMsg.value( msg.value() ); extMsg.senderModelId( msg.port().modelId() ); for( ; cursor != influList.end() ; cursor++ ) { if( coupled.outputPorts().find( (*cursor)->id() ) == coupled.outputPorts().end() ) { if( (*cursor)->model().isLocalMaster() ) { synchronizeList.insert( (*cursor)->model().localProc() ); extMsg.port( *(*cursor) ); SingleMsgAdm::Instance().send( extMsg, (*cursor)->model().localProc() ); } else { const ProcId &slaveId = (*cursor)->model().parentId(); if( slaveId != msg.procId() && (procs.find(slaveId) == procs.end()) ) { synchronizeList.insert(slaveId); procs.insert(slaveId); extMsg.port(msg.port()); SingleMsgAdm::Instance().send( extMsg, slaveId ); } } } else { outMsg.port( *(*cursor) ); SingleMsgAdm::Instance().send( outMsg, parentId ); } } return *this; } Processor &MCoordinator::receive( const DoneMessage &msg) { MASSERTMSG( doneCount() > 0 , "Unexpected Done Message! ") ; doneCount( doneCount() - 1 ); if( msg.isFromSlave() ) slaves[ msg.procId() ] = msg.time() + msg.nextChange() ; else dependents[ msg.procId() ] = msg.time() + msg.nextChange() ; if( doneCount() == 0 ) { lastChange( msg.time() ); nextChange( calculateNextChange(msg.time() ) ); DoneMessage doneMsg( msg.time() , this->Processor::id() , nextChange() , false); SingleMsgAdm::Instance().send( doneMsg, parentId ); } return *this; } Time &MCoordinator::calculateNextChange( const Time &time) const { Time next(Time::Inf); Coordinator::DependentList::const_iterator cursor; for ( cursor = slaves.begin() ; cursor != slaves.end(); cursor++) if( (cursor->second) < next ) next = cursor->second; for ( cursor = dependents.begin() ; cursor != dependents.end() ; cursor++) if( (cursor->second) < next ) next = cursor->second; next = next - time; return next; }