/******************************************************************* * Revised By: Qi (Jacky) Liu * * EMAIL: mailto://liuqi@sce.carleton.ca * * Revision Date: Sept. 6, 2005 *******************************************************************/ //-*-c++-*- #ifndef OUTPUT_QUEUE_CC #define OUTPUT_QUEUE_CC // Copyright (c) 1994-1996 Ohio Board of Regents and the University of // Cincinnati. All Rights Reserved. // // BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY // FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT // PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, // EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE // PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME // THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. // // IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING // WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR // REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR // DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL // DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM // (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED // INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF // THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH HOLDER // OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. // // $Id: OutputQueue.cc,v 1.1.1.1 2007/03/15 15:45:05 rmadhoun Exp $ // // // //--------------------------------------------------------------------------- #include "OutputQueue.hh" #include "BasicTimeWarp.hh" #include "../../../JackyDebugStream.h" //for jacky-debug-mode #ifdef JACKY_DEBUG #include "../../../message.h" #include "../../../pmessage.h" #endif OutputQueue::~OutputQueue() {} void OutputQueue::reset() { currentPos = NULL; insertPos = NULL; findPos = NULL; head = NULL; tail = NULL; listsize = 0; } BasicEvent* OutputQueue::find(VTime findTime, findMode_t mode) { static BasicEvent key; BasicEvent* findEvent; key.sendTime = findTime; findEvent = SortedList::find(&key, mode ); return(findEvent); } //[2006-03-20] //Jacky: this function is called by GVTManager::gcollect() when InfreqStateManager is used! void OutputQueue::gcollect(VTime gtime, ObjectRecord* simArray) { BasicEvent *delptr; saveCurrent(); delptr = seek(0, START); //Jacky: delete all elements on the outputQ whose sendTime < gVT while (delptr != NULL && delptr->sendTime < gtime) { // check to see if the event is local if(simArray[delptr->dest].ptr != NULL && simArray[delptr->dest].ptr->isCommManager()) { // this advances currentPos and deletes the container and the event since the event is non-local delete [] (char *) removeCurrent(); } else { // this event is local and will be deleted by the input queue // removeCurrent will delete only the container removeCurrent(); } delptr = get(); // get new element at currentPos } restoreCurrent(); } #ifdef STATEDEBUG void OutputQueue::gcollect(Container*& outputQptr, ObjectRecord* simArray, bool myLastOutput, StateManager* state) { #else //[2006-03-20] //Jacky: this function is called by GVTManager::gcollect() when StateManager is used! void OutputQueue::gcollect(VTime gtime, Container*& outputQptr, ObjectRecord* simArray, bool myLastOutput){ #endif Container* temp; BasicEvent *delptr; if (outputQptr == NULL) { // Nothing to garbage collect ... // save time and get out right away ... return; } // save currentPos for later since we are going to modify it saveCurrent(); delptr = seek(0, START); temp = getCurrent(); while (temp != NULL && temp->object != NULL) { if(temp->object->sendTime <= gtime){ #ifdef STATEDEBUG state->checkEvent(getCurrent(), 2); #endif // this advances currentPos, deletes container delptr = removeCurrent(); // Events send to another LP are not in inputQ and therefore they have // to be handled/removed from here if(simArray[delptr->dest].ptr != NULL && simArray[delptr->dest].ptr->isCommManager()) { delete [] ((char *)delptr); } delptr = get(); // get new element at currentPos if (temp == outputQptr) { // If we have deleted upto AND INCLUDING outputQptr break out... break; } temp = currentPos; } else { //break out of the while loop as we dont need to check further break; } } //end while restoreCurrent(); // set currentPos back to what it was } #ifdef JACKY_DEBUG void OutputQueue::printOutputQ(ostream& os){ Container* pelement; unsigned i; os << endl << "outputQ SIZE = " << size() << endl; i = 0; if (size() == 0) { os << "outputQ = (NULL)\n"; } else { for (pelement = getHead(); pelement != NULL; pelement = pelement->next) { BasicEvent* eventptr = pelement->object; //the BasicEvent referred by the cursor if( eventptr != NULL ) { TWMessage* temp = dynamic_cast(eventptr); Message* tempmsg = temp->getMessage(); os << "outputQ[" << i << "] Container = " << pelement << " -> BasicEvent = " << pelement->object <<" {id=" << eventptr->eventId << " sign=" << eventptr->sign << " " << eventptr->sender << "@" << eventptr->sendTime << " =" << tempmsg->type() << "=> " << eventptr->dest << "@" << eventptr->recvTime << " Processed?" << eventptr->alreadyProcessed << flush; #ifdef JACKY_EVENT_JUMP //~~~~~~~~~~~~~~~~~ Nov. 22, 2005 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if(temp->derivedFromExternalEvent() == true){ os << " [Event = T]" << flush; } #endif //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ os << "}" << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete tempmsg; #endif } i++; } //end for } //end size != 0 if( getCurrent() != NULL ) { BasicEvent* eventptr = getCurrent()->object; //the BasicEvent referred by currentPos if( eventptr != NULL ) { TWMessage* temp = dynamic_cast(eventptr); Message* tempmsg = temp->getMessage(); os << "outputQ::currentPos = " << getCurrent() << " -> BasicEvent = " << eventptr <<" {id=" << eventptr->eventId << " sign=" << eventptr->sign << " " << eventptr->sender << "@" << eventptr->sendTime << " =" << tempmsg->type() << "=> " << eventptr->dest << "@" << eventptr->recvTime << " Processed?" << eventptr->alreadyProcessed << "}" << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete tempmsg; #endif } } else { os << "outputQ::currentPos Container = NULL!" << endl << flush; } if( getHead() != NULL ){ BasicEvent* eventptr = getHead()->object; //the BasicEvent referred by head if( eventptr != NULL ) { TWMessage* temp = dynamic_cast(eventptr); Message* tempmsg = temp->getMessage(); os << "outputQ::head = " << getHead() << " -> BasicEvent = " << eventptr <<" {id=" << eventptr->eventId << " sign=" << eventptr->sign << " " << eventptr->sender << "@" << eventptr->sendTime << " =" << tempmsg->type() << "=> " << eventptr->dest << "@" << eventptr->recvTime << " Processed?" << eventptr->alreadyProcessed << "}" << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete tempmsg; #endif } } else { os << "outputQ::head = NULL!" << endl << flush; } if( getTail() != NULL ){ BasicEvent* eventptr = getTail()->object; //the BasicEvent referred by tail if( eventptr != NULL ) { TWMessage* temp = dynamic_cast(eventptr); Message* tempmsg = temp->getMessage(); os << "outputQ::tail = " << getTail() << " -> BasicEvent = " << eventptr <<" {id=" << eventptr->eventId << " sign=" << eventptr->sign << " " << eventptr->sender << "@" << eventptr->sendTime << " =" << tempmsg->type() << "=> " << eventptr->dest << "@" << eventptr->recvTime << " Processed?" << eventptr->alreadyProcessed << "}" << endl << flush; #ifdef JACKY_DESTROY_MESSAGE delete tempmsg; #endif } } else { os << "outputQ::tail = NULL!" << endl << flush; } } #endif //JACKY_DEBUG #ifdef JACKY_REVISION //override the insert function defined in SortedList void OutputQueue::insert(BasicEvent* toInsert){ register Container< BasicEvent >* newelem; newelem = new Container; newelem->object = toInsert; // Initialization necessary here since it is removed from Container() newelem->next = NULL; newelem->prev = NULL; // Mainlist update if (listsize == 0) { // empty list head = newelem; head->prev = NULL; tail = newelem; tail->next = NULL; } else { //non-empty list /* ** Jacky Note: When inserting BasicEvents into the outputQ, if the events have the same send time, they should be inserted by arrival order. i.e. New events should be inserted to the END of all events with the same sendTime in the outputQ. In previous version, [b] will always insert new events to the FRONT of all events with the same sendTime!!! (this results wrong value of the outputPos in the state) */ // insertPos can not be NULL because at least one element exists if(insertPos==NULL) { cerr << "ERROR: OutputQueue::insert--insertPos = NULL for non-empty list!" << endl; exit(-2); } //A> newelem->object >= insertPos->object if(compare(newelem->object, insertPos->object) >= 0) { while(insertPos != NULL && compare(newelem->object, insertPos->object) >= 0) { insertPos = insertPos->next; } //now, insertPos is NULL if tail is reached (case 1); OR //it is the 1st element with sendTime greater than that of the newelem (case 2) if(insertPos==NULL){ // (case 1) newelem->prev = tail; tail->next = newelem; tail = newelem; newelem->next = NULL; } else {//case 2, insert just BEFORE the insertPos newelem->next = insertPos; newelem->prev = insertPos->prev; insertPos->prev->next = newelem; insertPos->prev = newelem; } } else { //B> newelem->object < insertPos->object while(insertPos != NULL && compare(newelem->object, insertPos->object) < 0) { insertPos = insertPos->prev; } //now, insertPos is NULL if head is reached (case 1); OR //it is the LAST elem with sendTime <= that of the newelem (case 2) if(insertPos == NULL){ // case 1 newelem->next = head; head->prev = newelem; head = newelem; newelem->prev = NULL; } else { //case 2, insert just AFTER the inputPos newelem->prev = insertPos; newelem->next = insertPos->next; insertPos->next->prev = newelem; insertPos->next = newelem; } } //end B> } //end non-empty list insertPos = newelem; listsize++; } #endif //JACKY_REVISION #endif //OUTPUT_QUEUE_CC