// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "AribaOmnetModule.h" #include "ariba/utility/system/StartupWrapper.h" // circular inclusion using ariba::utility::StartupWrapper; namespace ariba { namespace communication { use_logging_cpp( AribaOmnetModule ); AribaOmnetModule::AribaOmnetModule(){ } AribaOmnetModule::~AribaOmnetModule(){ } void AribaOmnetModule::setServerPort(uint16_t _port){ serverPort = _port; } void AribaOmnetModule::start(){ //ostringstream o; //o << (void*)this; //cout << "AribaOmnetModule " + o.str() + " start" << std::endl; Enter_Method_Silent(); serverSocket.setCallbackObject(this); serverSocket.setOutputGate(gate("tcpOut")); serverSocket.bind( IPvXAddress(), serverPort ); serverSocket.listen(); logging_debug( cModule::fullPath() << "listening on server socket" ); } void AribaOmnetModule::stop(){ Enter_Method_Silent(); logging_debug( "stopping module " << cModule::fullPath() ); SocketMap::iterator i = sockets.begin(); SocketMap::iterator iend = sockets.end(); for( ; i != iend; i++ ) i->second->close(); serverSocket.close(); } TransportLocator::prot_t AribaOmnetModule::getId(){ return 6; // TCP } const vector AribaOmnetModule::getLocators(){ return vector(); } int AribaOmnetModule::numInitStages() const { // the FlatNetworkConfiguration distributes the IP address in stage 3 // so to get the assigned IP address we init in stage 4 :) return 4; } void AribaOmnetModule::initialize(int stage){ if( stage != 3 ) return; StartupWrapper::initSystem(); StartupWrapper::initConfig( par("configfile").stringValue() ); StartupWrapper::insertCurrentModule( this ); logging_debug( "initializing " << cModule::fullPath() ); logging_debug( "AribaOmnetModule " << (void*)this << " initialize" ); StartupInterface* service = NULL; cModuleType* type = findModuleType( ARIBA_SIMULATION_MODULE ); if( type != NULL ) { logging_debug( "found module type ... creating ..." ); service = (StartupInterface*)type->create( ARIBA_SIMULATION_MODULE, this ); } else { logging_fatal( "module type not found " << ARIBA_SIMULATION_MODULE ); } if( service == NULL ){ logging_fatal( "no service defined for simulation. " << "service not loaded using load-libs in omnetpp.ini, " << " or ARIBA_SIMULATION_SERVICE() not used" ); } else { StartupWrapper::startup( service ); } } void AribaOmnetModule::handleMessage(cMessage* msg){ logging_debug( cModule::fullPath() << " handling message" ); bool socketfound = false; SocketMap::iterator i = sockets.begin(); SocketMap::iterator iend = sockets.end(); for( ; i != iend; i++ ){ if( i->second->belongsToSocket( msg )){ i->second->processMessage( msg ); socketfound = true; logging_debug( cModule::fullPath() << " found socket for message" ); break; } } if( ! socketfound ) { logging_debug( cModule::fullPath() << " creating new socket for message" ); TCPSocket* dispatch = new TCPSocket( msg ); dispatch->setCallbackObject( this, dispatch ); dispatch->setOutputGate(gate("tcpOut")); ostringstream o; o << dispatch->remoteAddress().str() << ":" << dispatch->remotePort(); sockets.insert( make_pair(o.str(), dispatch) ); dispatch->processMessage( msg ); } } void AribaOmnetModule::finish(){ StartupWrapper::shutdown(); } seqnum_t AribaOmnetModule::sendMessage(const Message* message){ Enter_Method_Silent(); logging_debug( cModule::fullPath() << " sending message" ); // // serialize the data, get the destination address // Data data = data_serialize( message ); const_cast(message)->dropPayload(); const IPv4Locator* address = dynamic_cast(message->getDestinationAddress()); if( address == NULL ) return 0; size_t len = data.getLength()/8; uint8_t* buffer = data.getBuffer(); AribaOmnetMessage* outmsg = new AribaOmnetMessage( "AribaOmnetMessage"); outmsg->setPort( serverPort ); outmsg->setDataArraySize( len ); outmsg->setByteLength( len ); for( size_t i=0; isetData(i, *buffer); // // find the socket for this endpoint // SocketMap::iterator i = sockets.find( address->toString() ); TCPSocket* connectionSocket = NULL; if( i == sockets.end() ){ logging_debug( cModule::fullPath() << " creating new socket, connecting and queueing message" ); // don't have no connection yet for this endpoint // initiate a connection and remember the message for later sending ... SocketMap::iterator ret = sockets.insert( make_pair(address->toString(), new TCPSocket()) ); connectionSocket = ret->second; connectionSocket->setCallbackObject( this, connectionSocket ); connectionSocket->setOutputGate(gate("tcpOut")); pendingSends.insert( make_pair(connectionSocket, outmsg) ); connectionSocket->connect( address->getIP().c_str(), address->getPort() ); } else { logging_debug( cModule::fullPath() << " found socket, just sending out message" ); connectionSocket = i->second; connectionSocket->send( outmsg ); } // // release the data and we are out! // data.release(); return 0; } void AribaOmnetModule::socketDataArrived(int connId, void* socket, cMessage* msg, bool urgent){ TCPSocket* tcpsocket = (TCPSocket*)socket; AribaOmnetMessage* encap = dynamic_cast(msg); if( encap == NULL ) return; logging_debug( cModule::fullPath() << " socket data arrived " << msg->info() ); size_t len = encap->getDataArraySize(); Data data( len*8 ); uint8_t* pnt = data.getBuffer(); for( size_t i=0; igetData( i ); Message* spovnetmsg = new Message(data); ostringstream o; o << tcpsocket->remoteAddress().str() << ":" << encap->getPort(); spovnetmsg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) ); logging_debug( cModule::fullPath() << " forwarding to base communication" ); MessageProvider::sendMessageToReceivers( spovnetmsg ); delete encap; } void AribaOmnetModule::socketFailure(int connId, void* socket, int code){ logging_warn( cModule::fullPath() << " socket failure " << code ); } void AribaOmnetModule::socketClosed(int connId, void* socket){ logging_debug( cModule::fullPath() << " socket closed" ); } void AribaOmnetModule::socketPeerClosed(int connId, void* socket){ logging_debug( cModule::fullPath() << " socket peer closed" ); TCPSocket* tcpsocket = (TCPSocket*)socket; SocketMap::iterator i = sockets.begin(); SocketMap::iterator iend = sockets.end(); for( ; i != iend; i++ ){ if( i->second == tcpsocket ){ sockets.erase( i ); delete tcpsocket; break; } } } void AribaOmnetModule::socketEstablished(int connId, void* socket){ logging_debug( cModule::fullPath() << " socket established" ); TCPSocket* tcpsocket = (TCPSocket*)socket; assert( tcpsocket != NULL ); // if we have pending data for this socket // we are on the client side and initiated the connection // else, this is a dispatched socket on the server side PendingSendQueue::iterator i = pendingSends.find( tcpsocket ); if( i != pendingSends.end() ){ logging_debug( cModule::fullPath() << " socket established ... scheduling send msg" ); tcpsocket->send( i->second ); pendingSends.erase( i ); } else { logging_debug( cModule::fullPath() << " dispatch socket established ... server side" ); } } void AribaOmnetModule::socketStatusArrived(int connId, void* socket, TCPStatusInfo *status){ logging_debug( cModule::fullPath() << " socket status arrivede" ); } }} // namespace ariba, communication, internal