/******************************************************************* * * DESCRIPTION: class ParallelProcessor ( Base class for ParallelCoordinator, * ParallelSimulator and ParallelRootCoordinator ) * * AUTHOR: Alejandro Troccoli * Revisied By: Qi (Jacky) Liu (v1) * * EMAIL: mailto://atroccol@dc.uba.ar * mailto://liuqi@sce.carleton.ca * * DATE: 06/11/2000 * DATE (v1) : 10/18/2005 *******************************************************************/ #ifndef __PPROCESSOR_H #define __PPROCESSOR_H /** include files **/ #include "fstream.h" #include "SimulationObj.hh" // SimulationObj #include "pProcessorState.h" //ProcessorState #include "modelid.h" // class ModelId #include "VTime.hh" // class Time #include "except.h" #include "value.h" #include "msgbag.h" #include "JackyDebugStream.h" //for jacky-debug-mode & jacky-fileQ & jacky-revision /** forward declarations **/ class Model ; class BasicMsgValue; class Message; class InitMessage ; class InternalMessage ; class BasicOutputMessage; class OutputMessage ; class BasicExternalMessage; class ExternalMessage ; class DoneMessage ; class CollectMessage; class MessageBag; class OutputSyncMessage; class TWMessage; class ParallelProcessorAdmin ; class Port ; /** declarations **/ typedef ModelId ProcId ; class ParallelProcessor : public SimulationObj { public: virtual ~ParallelProcessor(); // Destructor //Functions that are required by Warped void initialize(); void executeProcess(); BasicState* allocateState(); #ifndef JACKY_REVISION //Jacky Note: this variable is moved to ParallelSimulator class since only a parallel // simulator should have a FC as its parent! // Oct. 19, 2005 // The simulator's parent is the NodeCoordinator (the value is set upon the reception of the Init message) ProcId parentFCId; #endif // all the posible messages //All synchronization messages (Init, Internal, Done & Collect ) //are received by &. There is no need to worry about memory deallocation since //there is code already written to handle that. //Instead, Input and Output messages are received through a pointer, and //will then be the owner of the pointer and will have to call delete //when finished! virtual ParallelProcessor &receive( const InitMessage &) ; virtual ParallelProcessor &receive( const InternalMessage &) ; virtual ParallelProcessor &receive( const DoneMessage &) ; virtual ParallelProcessor &receive( const CollectMessage &); virtual ParallelProcessor &receive( const OutputSyncMessage &); virtual ParallelProcessor &receive( const BasicOutputMessage *) ; virtual ParallelProcessor &receive( const BasicExternalMessage *) ; // the Processor Id const ProcId &id() const { return ident;} // return the asociate model Model &model() ; const Model &model() const ; const VTime &nextChange() const ; const VTime &lastChange() const ; const VTime absoluteNext() const { return nextChange() + lastChange(); } const string asString() const ; virtual const string description() const; static const ProcId InvalidId ; #ifdef JACKY_STATE //=========================================================== //Jacky: this function will be defined in all processors to clean up // locally defined variables (mainly MessageBag & NCMessageBag) //Oct. 26, 2005 virtual void rollbackProcessorVariables( const VTime& ) ; #endif //end JACKY_STATE ==================================================== #ifdef JACKY_DEBUG //Jacky: this function will be defined in processors to show how locally defined // variables are cleaned in a rollback virtual void showLocallyDefinedVariables( ostream& ) ; #endif //end JACKY_DEBUG protected: friend class Model ; friend class ParallelProcessorAdmin ; friend class ParallelRoot; ParallelProcessor() ; // Default constructor ParallelProcessor( Model * ) ; //ParallelProcessor( const ParallelProcessor & ); //Copy constructor ParallelProcessor &operator =( const ParallelProcessor & ) ; // Assignment operator int operator ==( const ParallelProcessor & ) const ; // Equality operator ParallelProcessor &id( const ProcId & ) ; ParallelProcessor &nextChange( const VTime & ) ; ParallelProcessor &lastChange( const VTime & ) ; //Checks if a rollback has taken place and cleans local variables. virtual bool rollbackCheck(const VTime& currentTime); // send virtual ParallelProcessor &send( const InitMessage &, const ProcId &dest ) ; virtual ParallelProcessor &send( const InternalMessage &, const ProcId &dest ) ; virtual ParallelProcessor &send( const CollectMessage &, const ProcId &dest ); virtual ParallelProcessor &send( const BasicOutputMessage &, const ProcId &dest ) ; virtual ParallelProcessor &send( const BasicExternalMessage &, const ProcId &dest) ; virtual ParallelProcessor &send( const DoneMessage &, const ProcId &dest) ; virtual ParallelProcessor &send( const OutputSyncMessage &, const ProcId &dest); virtual ParallelProcessor &sendOutput( const VTime &, const Port &, const Value & ); virtual ParallelProcessor &sendOutput( const VTime &, const Port &, BasicMsgValue *); Model *pmodel(); #ifdef JACKY_DEBUG //***************************************************************** //Nov. 3, 2005 //This function is used to show the next event that will be executed in the next cycle void showTheNextEvent(); #endif //***************************************************************************** unsigned createlog; int logIndex; #ifndef JACKY_FILEQ //=================================================================== //Jacky: This must be commented out since it redefined the same variable existing // in TimeWarp class, which causes FileQueue cannot be rolledback! // Oct. 18, 2005 int numOutFiles; #endif //================================================================================ /*Jacky Note: ** The MessageBag cannot be put in the state of processors (due to the reasons as ** described in class ParallelNodeCoordinator) ** Oct. 26, 2005 */ MessageBag externalMsgs; VTime lastMsgTime; #ifndef KERNEL_TIMEWARP //If the NoTime Kernel is used, create a FileQueue. ostream** outFileQ; #endif private: void sendTWMessage(TWMessage*, const Message&, const ProcId &dest); //Sends a TWMessage void writelog( const VTime&, const string& ); Model *mdl ; ProcId ident ; //VTime next ; These are now part of the ParProcessorState //VTime last ; }; // class ParallelProcessor /** inline **/ inline ParallelProcessor::ParallelProcessor() : mdl( NULL ) {} inline Model &ParallelProcessor::model() { return *mdl ; } inline const Model &ParallelProcessor::model() const { MASSERT( mdl ) ; return *mdl ; } inline const VTime &ParallelProcessor::nextChange() const { return ((ParallelProcessorState *)state->current)->next; } inline const VTime &ParallelProcessor::lastChange() const { return ((ParallelProcessorState *)state->current)->last; } inline ParallelProcessor &ParallelProcessor::nextChange( const VTime &t ) { ((ParallelProcessorState *)state->current)->next = t ; return *this ; } inline ParallelProcessor &ParallelProcessor::lastChange( const VTime &t ) { ((ParallelProcessorState *)state->current)->last = t ; return *this ; } inline const string ParallelProcessor::asString() const { return description() + "(" + id() + ")" ; } inline Model *ParallelProcessor::pmodel() { return mdl ; } #ifdef JACKY_STATE //=========================================================== /******************************************************************* * Function Name: rollbackProcessorVariables * Description: clean up locally defined variables during rollbacks * Oct. 26, 2005 ********************************************************************/ inline void ParallelProcessor::rollbackProcessorVariables( const VTime& rollbackTime ) { if( externalMsgs.size() > 0 && ( rollbackTime < externalMsgs.time() || rollbackTime == externalMsgs.time() ) ){ externalMsgs.eraseAll(); } } #endif //end JACKY_STATE ==================================================== #ifdef JACKY_DEBUG /******************************************************************* * Function Name: showLocallyDefinedVariables * Description: show how locally defined variables are cleaned during * rollbacks for debugging purpose * Oct. 26, 2005 ********************************************************************/ inline void ParallelProcessor::showLocallyDefinedVariables( ostream& out ) { out << ">>>>Processor[" << id() << "] showLocallyDefinedVariables(), MessageBag is =>" << endl << flush; out << externalMsgs << endl << flush; } #endif //end JACKY_DEBUG #endif //__PPROCESSOR_H