/******************************************************************* * Revised By: Qi (Jacky) Liu * * EMAIL: mailto://liuqi@sce.carleton.ca * * Revision Date: Sept. 6, 2005 *******************************************************************/ #ifndef LTSFINPUTQUEUE_HH #define LTSFINPUTQUEUE_HH // Copyright (c) 1994-1996 Ohio Board of Regents and the University of // Cincinnati. All Rights Reserved. // BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY // FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT // PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, // EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE // PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME // THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. // IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING // WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR // REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR // DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL // DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM // (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED // INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF // THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH HOLDER // OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. // // $Id: LTSFInputQueue.hh,v 1.1.1.1 2007/03/15 15:45:06 rmadhoun Exp $ //--------------------------------------------------------------------------- #include "config.hh" #include #include #include "BasicEvent.hh" #include "MultiList.hh" extern int BasicEventCompareRecv(const BasicEvent*, const BasicEvent*); class LTSFInputQueue : public MultiList { public: friend ostream& operator<< (ostream&, const LTSFInputQueue& ); LTSFInputQueue(); ~LTSFInputQueue(); bool insert(BasicEvent *toInsert, int id); BasicEvent* getAndSeek(int id, int localId); BasicEvent* getAndSeekObj(int id, int localId); BasicEvent* find(VTime&, int, findMode_t = EQUAL, int id=0); BasicEvent* getEventToSchedule(); void saveCurrent() {oldCurrentPtr = currentPos; }; void restoreCurrent() {currentPos = oldCurrentPtr; }; int gcollect(VTime&, int); int gcollect(VTime, int, BasicEvent*); int getNumberOfCommittedEvents(){ return noOfEventsCommitted; }; VTime calculateMin(); void print(ostream& = cout) const; void reset(); void printMiniList(int); VTime getNextUnprocessedEventTime(); bool initialized; BasicEvent *lastInserted; VTime lastGVT; int memoryUseage; #ifdef STATEDEBUG ofstream *outFile; void setStateFile (ofstream *outfile) { outFile = outfile; } #endif #ifdef JACKY_COUNT_MESSAGE_NUMBER //[2006-03-31] //define variables to measure the performance unsigned long positiveMsgNumber; unsigned long negativeMsgNumber; unsigned long rollbackNumber; unsigned long positiveStragglerMsgNumber; unsigned long negativeStragglerMsgNumber; unsigned long totalNumberOfEventsRolledBack; //Total number of events unprocessed during rollbacks #endif //end JACKY_COUNT_MESSAGE_NUMBER [2006-03-31] #ifdef JACKY_DEBUG // Nov. 17, 2005 ---------------------------------------------------------- //function to print the inputQ for the given processor void printMiniInputQ( ostream& os, int localId ); //function to print the whole inputQ void printFullInputQ( ostream& os ); #endif //------------------------------------------------------------------------------------ #ifdef JACKY_RB_EXCEPTION // Nov. 5, 2005 ==================================================== //function to handle rollback exceptional situations, where we need to remove a straggler //event from the inputQ. This is for the following situation: //The NC sent the 1st (X) message in the NCBag to the FC and triggered rollbacks on that //machine. This straggler (X) message (event) should be removed form the inputQ //we use the similar logic for message implosion as in the insert function. The difference //is that the implosion is triggered by the straggler position message instead of a negative //one. void removeStragglerEvent(BasicEvent *toRemove, int id); #endif //end JACKY_RB_EXCEPTION =============================================================== #ifdef JACKY_NC_BP_STATE //[2006-02-02] //function to recover from the "breakpoint" state on the NC's stateQ //1. unprcess the BasicEvent passed in, which is the inputPos of the breakpoint state //2. make this unprocessed Basicevent the next event to be executed void reExecuteImmediately(BasicEvent *inputPos, int localId); #endif //end JACKY_NC_BP_STATE [2006-02-02] #ifdef JACKY_NCBAG_POINTERS //[2006-02-03] //function to unprocess the BasicEvent from which an (X) message is derived. //this is used for the NC to unprocess (X) messages in the NCBag that have recvTime > minTime void unprocessXEvent(BasicEvent* toUnprocess, int localId); //function to adjust the VTime in lVTArray[] for simuObj with localId //this is used for the NC to adjust the lVTArray[ncLocalId] to the minTime after resending //(X) messages in the NCBag void resetLVTArrayTime(VTime newTime, int localId); #endif //end JACKY_NCBAG_POINTERS [2006-02-03] protected: #ifndef JACKY_REVISION bool miniListRollback(BasicEvent* , int ); #else //Jacky's version : use a bool flag to indicate whether we should do a secondary rollback when a (-) msg // is received. This flag has FALSE as its default value bool miniListRollback(BasicEvent* , int , bool implodeProcessed = false ); #endif #ifndef ONE_ANTI_MESSAGE //[2006-03-09] //Jacky Note: //this function is used only when "ONE_ANTI_MESSAGE" is not used void miniListUnprocessMessages(BasicEvent* , int ); #else //[2006-03-09] This is for ONE_ANTI_MESSAGE //Jacky Note: //this function is for "ONE_ANTI_MESSAGE" optimization! //we need to do different things based on the "implodeProcessed" flag passed in: // 1. if implodeProcessed = true (i.e. the imploded [+] msg has been processed) // --> we need to do: // a> cancel all [+] msgs that were from the same sender and have sendTime >= [-]msg.sendTime // b> unprocess all other messages with recvTime >= [-]msg.recvTime // 2. if implodeProcessed = false (i.e. the imploded [+] msg has not yet been processed) // --> we only need to do message cancellation! No message unprocessing should be done in this case! void miniListUnprocessMessages(BasicEvent* , int , bool implodeProcessed = false ); #endif //end ONE_ANTI_MESSAGE [2006-03-09] VTime getLVTForThisObj(int id); BasicEvent* oldCurrentPtr; BasicEvent* tmpCurrentPtr; BasicEvent *unprocessPtr; int noOfEventsCommitted; }; inline VTime LTSFInputQueue::calculateMin(){ register VTime retval = PINFINITY; register BasicEvent *ptr; ptr = MultiList::get(); if(ptr != NULL){ retval = ptr->recvTime; } return retval; } inline VTime LTSFInputQueue::getNextUnprocessedEventTime() { register VTime retval = PINFINITY; register BasicEvent* ptr = get(); while((ptr != NULL) &&(ptr->alreadyProcessed == true)) { ptr = ptr->next; } if(ptr == NULL) { return retval; } else { return ptr->recvTime; } } #endif // LTSFINPUTQUEUE_HH