#include "cppwrapper.h" #include "root.h" //#include "port.h" #include "time.h" #include "coupled.h" #include "procadm.h" //#include "model.h" #include "portlist.h" #include "atomcell.h" #include "coupled.h" #include "modeladm.h" #include "msgadm.h" #include //#include "except.h" #include "message.h" #include "mainsimu.h" #include #include #include #include "mainsimu.h" //class Coupled; CPPWrapper* SingleCPPWrapper::instance(0); CPPWrapper::CPPWrapper() { messageMonitorStarted = false; machineId = -1; fcId = Processor::InvalidId; procMachineIds[ProcessorAdmin::rootId] = 0; debugLog.open("debugInfo.log",ios::in); debugLog << "Debug Info Log File used to debugiing CD++ " << endl ; debugLog << "===============================================================" << endl << flush; } CPPWrapper& CPPWrapper::getCurrentSimTime() { int result ; string simTime_ = ((Time) Root::Instance().lastChg).asString(); // cout << "in getCurrenSimTime , simTime_ is " << simTime_ << endl; messageBuffer msg; msg.mtype = WRAPPER_ORIGINATED; msg.specific_type = CURRENT_SIMULATION_TIME_RESPONSE; msg.session_id = sessionID; strcpy(msg.charParam1,simTime_.c_str()); // cout << "before send_message in (getCurrentSimTime) " << endl; result = send_message(send_queue_id, &msg); // cout << "after send_message in (getCurrentSimTime) " << endl; return *this; } CPPWrapper& CPPWrapper::insertExternalEvent( messageBuffer* msg) { string event_time_str =string(msg->charParam1); string event_port_str = string(msg->charParam2); double value = msg->doubleParam1; Time event_time(event_time_str); Time& event_time_ = *(&event_time); // Root::Instance().initialize(); //Rami // Port & event_port( Root::Instance().top().port(event_port_str)); Coupled &top = static_cast(SingleModelAdm::Instance().model("top")); Port & event_port = top.port(event_port_str) ; Real event_value(value); Event external_event(event_time, &event_port,event_value); const Time& currentSimTime = Root::Instance().lastChg; // SingleCPPWrapper::Instance().getCurrentSimTime(); if (( event_time_ > currentSimTime) || ( event_time_ == currentSimTime)) { Root::Instance().externalEvents.push_back(external_event); Root::Instance().externalEvents.sort(); // resumeSimThread(); EventList::const_iterator cursor; /* cout << "==============================================" << endl; for(cursor = Root::Instance().externalEvents.begin(); cursor != Root::Instance().externalEvents.end(); cursor++) cout << (*cursor).asString() << endl; cout << "==============================================" << endl; */ } else { // cout << "Warning: Event received with time stamp in the past " << event_time.asString() << " @ " // << currentSimTime.asString() << endl; } } CPPWrapper& CPPWrapper::setJniParams(JNIEnv * env_ , jobject obj_) { this->jni_env = env_; this->jni_obj = obj_; return *this; } void CPPWrapper::setSessionLogFileName(string file_name) { sessionLogFileName = file_name; if (sessionLog.is_open()) sessionLog.close(); sessionLog.open(sessionLogFileName.c_str(),ios::app); } ofstream& CPPWrapper::sessionLogStream() { return sessionLog; } ostream& CPPWrapper::debugLogStream() { return debugLog; } CPPWrapper& CPPWrapper::saveDebugInfo() { } CPPWrapper& CPPWrapper::setSessionID(int sessionID_) { sessionID = sessionID_; return *this; } int CPPWrapper::getSessionID() const { return sessionID; } CPPWrapper& CPPWrapper::setPath(string path_) { cdppPath = path_; // cout << "CPPWrapper:: the cdppPath has ben set to " << cdppPath << endl; return *this; } string CPPWrapper::getPath() const { return cdppPath; } CPPWrapper& CPPWrapper::initializeMessageQueues() { key_t send_queue_key, receive_queue_key; send_queue_key = ftok(cdppPath.c_str(),'r'); receive_queue_key = ftok(cdppPath.c_str(),'s'); this->send_queue_id = open_queue(send_queue_key); this->receive_queue_id = open_queue(receive_queue_key); // cout << "send_queue_id: " << send_queue_id << endl; // cout << "receive_queue_id: " << receive_queue_id << endl; return *this; } CPPWrapper& CPPWrapper::startMessageMonitor() { int iret1; iret1 = pthread_create(&(this->messageMonitorThread), NULL,messageMonitor, (void *) this->receive_queue_id); time_t rawtime; time(&rawtime); /* if (iret1 != -1) cout << "started the message monitor " << this->receive_queue_id << "@" << ctime(&rawtime) << endl; else cout << "error starting the message monitor " << endl; */ return *this; } CPPWrapper::~CPPWrapper() { sessionLog.close(); } int CPPWrapper::machineForModel(const string model_name) { int result; messageBuffer requestMsg, responseMsg; requestMsg.mtype = CDPP_ORIGINATED; requestMsg.specific_type = MACHINE_FOR_MODEL_REQUEST; requestMsg.session_id = sessionID; strcpy(requestMsg.charParam1, model_name.c_str()); result = send_message(send_queue_id, &requestMsg); if (result != -1) read_message( receive_queue_id , CDPP_ORIGINATED, &responseMsg); if (responseMsg.specific_type == MACHINE_FOR_MODEL_RESPONSE && responseMsg.session_id == sessionID) return responseMsg.longParam1; else return (-1); } CPPWrapper& CPPWrapper::stop(bool calledByRoot) { int result = -1; messageBuffer msg; msg.mtype = CDPP_ORIGINATED; msg.specific_type = SIMULATION_STOP; msg.session_id = sessionID; result = send_message(send_queue_id, &msg); if (result == -1) cout << "error while sending message to stop the simulation" << endl; if ( this->getMachineID() == 0 ) { if( !calledByRoot ) Root::Instance().stop(true); } else { SingleMsgAdm::Instance().stop(); } return *this; } int CPPWrapper::getMachineID() { if (machineId == -1) { int result = -1; messageBuffer requestMsg,responseMsg; requestMsg.mtype = CDPP_ORIGINATED; requestMsg.specific_type = GET_MACHINE_ID_REQUEST; requestMsg.session_id = sessionID; result = send_message(send_queue_id, &requestMsg); if (result != -1) read_message(receive_queue_id, CDPP_ORIGINATED, &responseMsg); if (responseMsg.specific_type == GET_MACHINE_ID_RESPONSE && responseMsg.session_id == sessionID) machineId = responseMsg.longParam1; else { cout << "session ID is " << responseMsg.session_id << " machineId is : " << responseMsg.longParam1 << endl; cout << "got an invalid response : responseSpecificType is " << responseMsg.specific_type << endl; } } return machineId; } const ProcMachineIDs &CPPWrapper::getProcMachineIds() const { return procMachineIds; } CPPWrapper &CPPWrapper::setMachineForProcId(const ProcId &pid, const MachineId &mid) { MASSERT( procMachineIds.find(pid) == procMachineIds.end() ); procMachineIds[pid] = mid; return *this; } const MachineId CPPWrapper::machineForProcId(const ProcId &pid) const { ProcMachineIDs::const_iterator cursor; cursor = procMachineIds.find(pid) ; if( cursor != procMachineIds.end() ) return cursor->second ; else return -1; } CPPWrapper &CPPWrapper::printMachineForProc() { ProcMachineIDs::const_iterator cursor; cursor = procMachineIds.begin(); cout << "****************************************************" << endl; cout << "Printint the machine to procID mapping " << endl; for( ; cursor != procMachineIds.end() ; cursor++ ) cout << " pid " << cursor->first << " MachineId : " << cursor->second << endl; cout << "******************************************************" << endl; return *this; } CPPWrapper& CPPWrapper::sendRemoteMessage(const InitMessage &msg, const MachineId &mid, const ProcId &pid) { // debugLog << "before sending InitMessage to " << pid << endl; int result = -1; const char* message_time; messageBuffer initMsg ; initMsg.session_id = sessionID; initMsg.mtype = CDPP_ORIGINATED; initMsg.specific_type = INIT_MESSAGE ; message_time = (msg.time().asString()).c_str(); strcpy(initMsg.charParam1, message_time); initMsg.longParam1 = msg.procId(); initMsg.longParam4 = mid; initMsg.longParam5 = pid; result = send_message( send_queue_id, &initMsg); return *this; } CPPWrapper& CPPWrapper::sendEchoMessage(const EchoMessage &msg, const MachineId &mid) { // debugLog << "before sending InitMessage to " << pid << endl; int result = -1; const char* message_time; messageBuffer echoMsg ; echoMsg.session_id = sessionID; echoMsg.mtype = CDPP_ORIGINATED; echoMsg.specific_type = ECHO_MESSAGE ; message_time = (msg.time().asString()).c_str(); strcpy(echoMsg.charParam1, message_time); echoMsg.longParam1 = -1 ; echoMsg.longParam4 = mid; echoMsg.longParam5 = -1; if( msg.getSendTimeSec() == -1 && msg.getSendTimeUSec() == -1) { struct timeval sendTime; gettimeofday( &sendTime, (struct timezone*) 0); echoMsg.longParam2 = sendTime.tv_sec; echoMsg.longParam3 = sendTime.tv_usec; } else { echoMsg.longParam2 = msg.getSendTimeSec(); echoMsg.longParam3 = msg.getSendTimeUSec(); } result = send_message( send_queue_id, &echoMsg); return *this; } CPPWrapper& CPPWrapper::sendRemoteMessage( const DoneMessage &msg, const MachineId &mid, const ProcId &pid) { // debugLog << "before sending DoneMessage to " << pid << endl; int result = -1; const char* message_time; const char* next_change; messageBuffer doneMsg ; doneMsg.session_id = sessionID; doneMsg.mtype = CDPP_ORIGINATED ; doneMsg.specific_type = DONE_MESSAGE ; message_time = ((msg.time()).asString()).c_str(); strcpy(doneMsg.charParam1 , message_time); doneMsg.longParam1 = msg.procId(); next_change = (msg.nextChange().asString()).c_str(); strcpy( doneMsg.charParam2 , next_change ); doneMsg.boolParam1 = msg.isFromSlave(); doneMsg.longParam4 = mid; doneMsg.longParam5 = pid; result = send_message(send_queue_id, &doneMsg); return *this; } CPPWrapper& CPPWrapper::sendRemoteMessage(const CollectMessage &msg, const MachineId &mid, const ProcId &pid) { // debugLog << "before sending CollectMessage to " << pid << endl; int result = -1; const char* message_time; messageBuffer collectMsg ; collectMsg.session_id = sessionID; collectMsg.mtype = CDPP_ORIGINATED; collectMsg.specific_type = COLLECT_MESSAGE; message_time = (msg.time().asString()).c_str(); strcpy(collectMsg.charParam1 , message_time); collectMsg.longParam1 = msg.procId(); collectMsg.longParam4 = mid; collectMsg.longParam5 = pid; result = send_message(send_queue_id , &collectMsg); return *this; } CPPWrapper& CPPWrapper::sendRemoteMessage(const InternalMessage &msg, const MachineId &mid, const ProcId &pid) { // debugLog << "before sending InternalMessage to " << pid << endl; int result = -1; const char* message_time; messageBuffer internalMsg ; internalMsg.session_id = sessionID; internalMsg.mtype = CDPP_ORIGINATED; internalMsg.specific_type = INTERNAL_MESSAGE; message_time = (msg.time().asString()).c_str(); strcpy(internalMsg.charParam1 , message_time); internalMsg.longParam1 = msg.procId(); internalMsg.longParam4 = mid; internalMsg.longParam5 = pid; result = send_message( send_queue_id , &internalMsg); return *this; } CPPWrapper& CPPWrapper::sendRemoteMessage(const ExternalMessage &msg, const MachineId &mid, const ProcId &pid) { // debugLog << "before sending ExternalMessage to " << pid << endl; int result = -1; const char* message_time; // const char* cell_position; messageBuffer externalMsg ; externalMsg.session_id = sessionID; externalMsg.mtype = CDPP_ORIGINATED; externalMsg.specific_type = EXTERNAL_MESSAGE; message_time = (msg.time().asString()).c_str(); strcpy( externalMsg.charParam1, message_time); externalMsg.longParam1 = msg.procId() ; externalMsg.longParam2 = msg.port().id(); externalMsg.doubleParam1 = msg.value(); externalMsg.longParam3 = msg.senderModelId(); // cell_position = (msg.senderCellPosition().print()).c_str(); // strcpy( externalMsg.charParam3 , cell_position ) ; // Rami: this may not be used // since the senderCellPosition is only used for FlatCoupledCell which can NOT // be split into different machines externalMsg.longParam4 = mid; externalMsg.longParam5 = pid; result = send_message( send_queue_id , &externalMsg); return *this; } CPPWrapper& CPPWrapper::sendRemoteMessage(const OutputMessage &msg, const MachineId &mid, const ProcId &pid) { // debugLog << "before sending OutputMessage to " << pid << endl; int result = -1; const char* message_time; messageBuffer outputMsg ; outputMsg.session_id = sessionID; outputMsg.mtype = CDPP_ORIGINATED; outputMsg.specific_type = OUTPUT_MESSAGE; message_time = (msg.time().asString()).c_str(); strcpy( outputMsg.charParam1, message_time); outputMsg.longParam1 = msg.procId(); outputMsg.longParam2 = msg.port().id(); outputMsg.doubleParam1 = msg.value(); outputMsg.longParam4 = mid; outputMsg.longParam5 = pid; result = send_message(send_queue_id, &outputMsg); return *this; } CPPWrapper& CPPWrapper::receiveRemoteMessage(const messageBuffer &msgBuffer) { struct timeval msgStartTime, msgEndTime; gettimeofday(&msgEndTime, (struct timezone*) 0); msgStartTime.tv_sec = msgBuffer.longParam6; msgStartTime.tv_usec = msgBuffer.longParam7; double timeDiff = MainSimulator::Instance().calTimeDiff( msgStartTime, msgEndTime); messageTransferDelays.push_back(timeDiff) ; string infStr("..."); string msgTimeStr(msgBuffer.charParam1); Time msgTime( msgTimeStr ); if(msgTimeStr == infStr) msgTime = Time::Inf ; ProcId srcPid = msgBuffer.longParam1 ; ProcId destPid = msgBuffer.longParam5; switch( msgBuffer.specific_type) { case INIT_MESSAGE: { InitMessage initMsg(msgTime, srcPid); // debugLog << "received Remote Message " << initMsg.asString() << endl; #ifdef WITH_PCDPP if (SingleCPPWrapper::Instance().getFCId() == Processor::InvalidId) SingleCPPWrapper::Instance().setFCId( initMsg.procId() ); #endif SingleMsgAdm::Instance().send( initMsg , destPid ); break; } case DONE_MESSAGE: { string nextChgStr(msgBuffer.charParam2); Time nextChg(nextChgStr); if( nextChgStr == infStr ) nextChg = Time::Inf ; bool isFromSlave = msgBuffer.boolParam1 ; DoneMessage doneMsg( msgTime, srcPid, nextChg, isFromSlave); SingleMsgAdm::Instance().send( doneMsg, destPid ); break; } case COLLECT_MESSAGE: { CollectMessage collectMsg(msgTime, srcPid); SingleMsgAdm::Instance().send( collectMsg, destPid); break; } case INTERNAL_MESSAGE: { InternalMessage internalMsg( msgTime, srcPid); SingleMsgAdm::Instance().send( internalMsg, destPid); break; } case EXTERNAL_MESSAGE: { Model *model = static_cast(&SingleProcessorAdmin::Instance().model( srcPid )); PortList &inPorts = model->inputPorts(); int portId = msgBuffer.longParam2; Port *port = 0; double value; /* PortList::const_iterator cursor; cursor = inPorts.find(portId); if( cursor != inPorts.end() ) port = cursor->second ; */ port = getPortFromList(portId) ; // cout << "CPPWrapper:receiveRemoteMessage before MASSERTMSG " << endl; MASSERTMSG( port != 0 , "could NOT find the output port in CPPWrapper::receiveRemoteMessage() " ); value = msgBuffer.doubleParam1 ; ExternalMessage extMsg( msgTime, srcPid , *port , value); extMsg.senderModelId( msgBuffer.longParam3 ); SingleMsgAdm::Instance().send( extMsg , destPid ); break; } case OUTPUT_MESSAGE: { Model *model = static_cast(&SingleProcessorAdmin::Instance().model( srcPid )) ; PortList &outPorts = model->outputPorts(); int portId = msgBuffer.longParam2; PortList::const_iterator cursor; Port *port = 0; double value; value = msgBuffer.doubleParam1; /* cursor = outPorts.find(portId); if( cursor != outPorts.end() ) port = cursor->second ; */ port = getPortFromList(portId); MASSERTMSG( port != 0 , "could NOT find the output port in CPPWrapper::receiveRemoteMessage() " ); OutputMessage outMsg( msgTime , srcPid ,*port, value ); SingleMsgAdm::Instance().send( outMsg, destPid ); break; } } } CPPWrapper &CPPWrapper::receiveEchoMessage(const messageBuffer &msgBuffer) { if ( getMachineID() != 0 ) { string msgTime = string( msgBuffer.charParam1 ); ProcId pid = -1; long sendTimeSec = msgBuffer.longParam2 ; long sendTimeUSec = msgBuffer.longParam3 ; EchoMessage echoMsg(msgTime, pid, sendTimeSec , sendTimeUSec); this->sendEchoMessage(echoMsg , 0 ); } else { struct timeval sendTime ,receiveTime; sendTime.tv_sec = msgBuffer.longParam2; sendTime.tv_usec = msgBuffer.longParam3; gettimeofday( &receiveTime, (struct timezone*) 0 ); double transferTime = MainSimulator::Instance().calTimeDiff( sendTime, receiveTime); SOAPTransferDelays.push_back(transferTime); } return *this; } CPPWrapper &CPPWrapper::addPortToList(const PortId & id, Port* port) { PortList::const_iterator cursor; cursor = allPorts.find(id); if(cursor == allPorts.end() ) allPorts[id] = port; return *this; } Port* CPPWrapper::getPortFromList(const PortId & id) { PortList::const_iterator cursor; cursor = allPorts.find(id); if(cursor != allPorts.end() ) return cursor->second; } CPPWrapper &CPPWrapper::addZonePartition(const char* zoneChar, const MachineId &mid) { // cout << "CPPWrapper::addZonePartition adding " << zoneChar << " : " << mid << endl; string zone(zoneChar); ZonePartitions::const_iterator cursor; cursor = zonePartitions.find(zone); if( cursor == zonePartitions.end() ) zonePartitions[zone] = mid; return *this; } CPPWrapper &CPPWrapper::setFCId(const ProcId& id) { fcId = id; return *this; } CPPWrapper &CPPWrapper::calSOAPMessageDelay() { if ( getMachineID() == 0 ) { for ( int i = 0 ; i < ECHO_MESSAGE_COUNT ; i++) { Time msgTime("50:50:50:000"); EchoMessage echoMsg(msgTime, -1); this->sendEchoMessage(echoMsg, 1); } } while ( SOAPTransferDelays.size() < ECHO_MESSAGE_COUNT ) ; if (SOAPTransferDelays.size() == ECHO_MESSAGE_COUNT ) { double totalTime = 0; list::const_iterator cursor = SOAPTransferDelays.begin(); for( ; cursor != SOAPTransferDelays.end() ; cursor++) totalTime += *cursor; cout << "*******************************************************************\n"; cout << "The total number of SOAP Echo Messages is : " << SOAPTransferDelays.size() << endl; cout << "The total transfer time is " << totalTime << endl; cout << "The average SOAP transfer delay is : " << ( (double) totalTime/SOAPTransferDelays.size()/2 ) << " m seconds" << endl ; cout << "*******************************************************************\n"; } else if ( SOAPTransferDelays.size() == 0 ) { cout << "*******************************************************************\n"; cout << "No SOAP messages were sent in this session" << endl; cout << "*******************************************************************\n"; } return *this; } CPPWrapper &CPPWrapper::addCPPTransferDelay() { double transferTime = MainSimulator::Instance().calTimeDiff( messageSendTime, messageReceiveTime); cppTransferDelays.push_back(transferTime); return *this; } CPPWrapper &CPPWrapper::calCPPMessageDelay() { list::const_iterator cursor; double totalTime = 0; for( cursor = cppTransferDelays.begin(); cursor != cppTransferDelays.end(); cursor++) { totalTime += *cursor ; } cout << "*******************************************************************\n"; cout << "The number of CPP (local) Messages considered into calculation : " << cppTransferDelays.size() << endl; cout << "The average CPP (local) Messages delay : " << ( (double) totalTime/cppTransferDelays.size() ) << "m seconds" << endl; cout << "*******************************************************************\n"; return *this; }