| 1 | // [Licence]
|
---|
| 2 | // The Ariba-Underlay Copyright
|
---|
| 3 | //
|
---|
| 4 | // Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
|
---|
| 5 | //
|
---|
| 6 | // Institute of Telematics
|
---|
| 7 | // UniversitÀt Karlsruhe (TH)
|
---|
| 8 | // Zirkel 2, 76128 Karlsruhe
|
---|
| 9 | // Germany
|
---|
| 10 | //
|
---|
| 11 | // Redistribution and use in source and binary forms, with or without
|
---|
| 12 | // modification, are permitted provided that the following conditions are
|
---|
| 13 | // met:
|
---|
| 14 | //
|
---|
| 15 | // 1. Redistributions of source code must retain the above copyright
|
---|
| 16 | // notice, this list of conditions and the following disclaimer.
|
---|
| 17 | // 2. Redistributions in binary form must reproduce the above copyright
|
---|
| 18 | // notice, this list of conditions and the following disclaimer in the
|
---|
| 19 | // documentation and/or other materials provided with the distribution.
|
---|
| 20 | //
|
---|
| 21 | // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
|
---|
| 22 | // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
---|
| 23 | // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
---|
| 24 | // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
|
---|
| 25 | // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
---|
| 26 | // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
---|
| 27 | // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
---|
| 28 | // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
---|
| 29 | // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
---|
| 30 | // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
---|
| 31 | // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
---|
| 32 | //
|
---|
| 33 | // The views and conclusions contained in the software and documentation
|
---|
| 34 | // are those of the authors and should not be interpreted as representing
|
---|
| 35 | // official policies, either expressed or implied, of the Institute of
|
---|
| 36 | // Telematics.
|
---|
| 37 | // [Licence]
|
---|
| 38 |
|
---|
| 39 | #include "AribaOmnetModule.h"
|
---|
| 40 |
|
---|
| 41 | #include "ariba/utility/system/StartupWrapper.h" // circular inclusion
|
---|
| 42 | #include "ariba/interface/ServiceInterface.h"
|
---|
| 43 | using ariba::utility::StartupWrapper;
|
---|
| 44 | using ariba::interface::ServiceInterface;
|
---|
| 45 |
|
---|
| 46 | namespace ariba {
|
---|
| 47 | namespace communication {
|
---|
| 48 |
|
---|
| 49 | use_logging_cpp( AribaOmnetModule );
|
---|
| 50 |
|
---|
| 51 | AribaOmnetModule::AribaOmnetModule(){
|
---|
| 52 | }
|
---|
| 53 |
|
---|
| 54 | AribaOmnetModule::~AribaOmnetModule(){
|
---|
| 55 | }
|
---|
| 56 |
|
---|
| 57 | void AribaOmnetModule::setServerPort(uint16_t _port){
|
---|
| 58 | serverPort = _port;
|
---|
| 59 | }
|
---|
| 60 |
|
---|
| 61 | void AribaOmnetModule::start(){
|
---|
| 62 |
|
---|
| 63 | //ostringstream o;
|
---|
| 64 | //o << (void*)this;
|
---|
| 65 | //cout << "AribaOmnetModule " + o.str() + " start" << std::endl;
|
---|
| 66 |
|
---|
| 67 | Enter_Method_Silent();
|
---|
| 68 |
|
---|
| 69 | serverSocket.setCallbackObject(this);
|
---|
| 70 | serverSocket.setOutputGate(gate("tcpOut"));
|
---|
| 71 | serverSocket.bind( IPvXAddress(), serverPort );
|
---|
| 72 | serverSocket.listen();
|
---|
| 73 |
|
---|
| 74 | logging_debug( cModule::fullPath() << "listening on server socket" );
|
---|
| 75 | }
|
---|
| 76 |
|
---|
| 77 | void AribaOmnetModule::stop(){
|
---|
| 78 |
|
---|
| 79 | Enter_Method_Silent();
|
---|
| 80 |
|
---|
| 81 | logging_debug( "stopping module " << cModule::fullPath() );
|
---|
| 82 |
|
---|
| 83 | SocketMap::iterator i = sockets.begin();
|
---|
| 84 | SocketMap::iterator iend = sockets.end();
|
---|
| 85 |
|
---|
| 86 | for( ; i != iend; i++ )
|
---|
| 87 | i->second->close();
|
---|
| 88 |
|
---|
| 89 | serverSocket.close();
|
---|
| 90 | }
|
---|
| 91 |
|
---|
| 92 | TransportLocator::prot_t AribaOmnetModule::getId(){
|
---|
| 93 | return 6; // TCP
|
---|
| 94 | }
|
---|
| 95 |
|
---|
| 96 | const vector<TransportLocator*> AribaOmnetModule::getLocators(){
|
---|
| 97 | return vector<TransportLocator*>();
|
---|
| 98 | }
|
---|
| 99 |
|
---|
| 100 | int AribaOmnetModule::numInitStages() const {
|
---|
| 101 | // the FlatNetworkConfiguration distributes the IP address in stage 3
|
---|
| 102 | // so to get the assigned IP address we init in stage 4 :)
|
---|
| 103 | return 4;
|
---|
| 104 | }
|
---|
| 105 |
|
---|
| 106 | void AribaOmnetModule::initialize(int stage){
|
---|
| 107 | if( stage != 3 ) return;
|
---|
| 108 |
|
---|
| 109 | StartupWrapper::initSystem();
|
---|
| 110 | StartupWrapper::initConfig( par("configfile").stringValue() );
|
---|
| 111 |
|
---|
| 112 | StartupWrapper::insertCurrentModule( this );
|
---|
| 113 |
|
---|
| 114 | logging_debug( "initializing " << cModule::fullPath() );
|
---|
| 115 | logging_debug( "AribaOmnetModule " << (void*)this << " initialize" );
|
---|
| 116 |
|
---|
| 117 | StartupInterface* service = NULL;
|
---|
| 118 | cModuleType* type = findModuleType( ARIBA_SIMULATION_MODULE );
|
---|
| 119 |
|
---|
| 120 | if( type != NULL ) {
|
---|
| 121 | logging_debug( "found module type ... creating ..." );
|
---|
| 122 | service = (StartupInterface*)type->create( ARIBA_SIMULATION_MODULE, this );
|
---|
| 123 | } else {
|
---|
| 124 | logging_fatal( "module type not found " << ARIBA_SIMULATION_MODULE );
|
---|
| 125 | }
|
---|
| 126 |
|
---|
| 127 | if( service == NULL ){
|
---|
| 128 | logging_fatal( "no service defined for simulation. " <<
|
---|
| 129 | "service not loaded using load-libs in omnetpp.ini, " <<
|
---|
| 130 | " or ARIBA_SIMULATION_SERVICE() not used" );
|
---|
| 131 | } else {
|
---|
| 132 | StartupWrapper::startup( service );
|
---|
| 133 | }
|
---|
| 134 | }
|
---|
| 135 |
|
---|
| 136 | void AribaOmnetModule::handleMessage(cMessage* msg){
|
---|
| 137 |
|
---|
| 138 | logging_debug( cModule::fullPath() << " handling message" );
|
---|
| 139 | bool socketfound = false;
|
---|
| 140 |
|
---|
| 141 | SocketMap::iterator i = sockets.begin();
|
---|
| 142 | SocketMap::iterator iend = sockets.end();
|
---|
| 143 |
|
---|
| 144 | for( ; i != iend; i++ ){
|
---|
| 145 | if( i->second->belongsToSocket( msg )){
|
---|
| 146 | i->second->processMessage( msg );
|
---|
| 147 | socketfound = true;
|
---|
| 148 |
|
---|
| 149 | logging_debug( cModule::fullPath() << " found socket for message" );
|
---|
| 150 | break;
|
---|
| 151 | }
|
---|
| 152 | }
|
---|
| 153 |
|
---|
| 154 | if( ! socketfound ) {
|
---|
| 155 |
|
---|
| 156 | logging_debug( cModule::fullPath() << " creating new socket for message" );
|
---|
| 157 |
|
---|
| 158 | TCPSocket* dispatch = new TCPSocket( msg );
|
---|
| 159 | dispatch->setCallbackObject( this, dispatch );
|
---|
| 160 | dispatch->setOutputGate(gate("tcpOut"));
|
---|
| 161 |
|
---|
| 162 | ostringstream o;
|
---|
| 163 | o << dispatch->remoteAddress().str() << ":" << dispatch->remotePort();
|
---|
| 164 |
|
---|
| 165 | sockets.insert( make_pair(o.str(), dispatch) );
|
---|
| 166 | dispatch->processMessage( msg );
|
---|
| 167 | }
|
---|
| 168 | }
|
---|
| 169 |
|
---|
| 170 | void AribaOmnetModule::finish(){
|
---|
| 171 | StartupWrapper::shutdown();
|
---|
| 172 | }
|
---|
| 173 |
|
---|
| 174 | seqnum_t AribaOmnetModule::sendMessage(const Message* message){
|
---|
| 175 |
|
---|
| 176 | Enter_Method_Silent();
|
---|
| 177 | logging_debug( cModule::fullPath() << " sending message" );
|
---|
| 178 |
|
---|
| 179 | //
|
---|
| 180 | // serialize the data, get the destination address
|
---|
| 181 | //
|
---|
| 182 |
|
---|
| 183 | Data data = data_serialize( message );
|
---|
| 184 | const_cast<Message*>(message)->dropPayload();
|
---|
| 185 |
|
---|
| 186 | const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
|
---|
| 187 | if( address == NULL ) return 0;
|
---|
| 188 |
|
---|
| 189 | size_t len = data.getLength()/8;
|
---|
| 190 | uint8_t* buffer = data.getBuffer();
|
---|
| 191 |
|
---|
| 192 | AribaOmnetMessage* outmsg = new AribaOmnetMessage( "AribaOmnetMessage");
|
---|
| 193 | outmsg->setPort( serverPort );
|
---|
| 194 | outmsg->setDataArraySize( len );
|
---|
| 195 | outmsg->setByteLength( len );
|
---|
| 196 |
|
---|
| 197 | for( size_t i=0; i<len; i++, buffer++)
|
---|
| 198 | outmsg->setData(i, *buffer);
|
---|
| 199 |
|
---|
| 200 | //
|
---|
| 201 | // find the socket for this endpoint
|
---|
| 202 | //
|
---|
| 203 |
|
---|
| 204 | SocketMap::iterator i = sockets.find( address->toString() );
|
---|
| 205 | TCPSocket* connectionSocket = NULL;
|
---|
| 206 |
|
---|
| 207 | if( i == sockets.end() ){
|
---|
| 208 |
|
---|
| 209 | logging_debug( cModule::fullPath() <<
|
---|
| 210 | " creating new socket, connecting and queueing message" );
|
---|
| 211 |
|
---|
| 212 | // don't have no connection yet for this endpoint
|
---|
| 213 | // initiate a connection and remember the message for later sending ...
|
---|
| 214 |
|
---|
| 215 | SocketMap::iterator ret = sockets.insert(
|
---|
| 216 | make_pair(address->toString(), new TCPSocket()) );
|
---|
| 217 | connectionSocket = ret->second;
|
---|
| 218 |
|
---|
| 219 | connectionSocket->setCallbackObject( this, connectionSocket );
|
---|
| 220 | connectionSocket->setOutputGate(gate("tcpOut"));
|
---|
| 221 |
|
---|
| 222 | pendingSends.insert( make_pair(connectionSocket, outmsg) );
|
---|
| 223 | connectionSocket->connect( address->getIP().c_str(), address->getPort() );
|
---|
| 224 |
|
---|
| 225 | } else {
|
---|
| 226 |
|
---|
| 227 | logging_debug( cModule::fullPath() << " found socket, just sending out message" );
|
---|
| 228 | connectionSocket = i->second;
|
---|
| 229 | connectionSocket->send( outmsg );
|
---|
| 230 | }
|
---|
| 231 |
|
---|
| 232 | //
|
---|
| 233 | // release the data and we are out!
|
---|
| 234 | //
|
---|
| 235 |
|
---|
| 236 | data.release();
|
---|
| 237 | return 0;
|
---|
| 238 | }
|
---|
| 239 |
|
---|
| 240 | void AribaOmnetModule::socketDataArrived(int connId, void* socket, cMessage* msg, bool urgent){
|
---|
| 241 |
|
---|
| 242 | TCPSocket* tcpsocket = (TCPSocket*)socket;
|
---|
| 243 |
|
---|
| 244 | AribaOmnetMessage* encap = dynamic_cast<AribaOmnetMessage*>(msg);
|
---|
| 245 | if( encap == NULL ) return;
|
---|
| 246 |
|
---|
| 247 | logging_debug( cModule::fullPath() << " socket data arrived " << msg->info() );
|
---|
| 248 |
|
---|
| 249 | size_t len = encap->getDataArraySize();
|
---|
| 250 | Data data( len*8 );
|
---|
| 251 | uint8_t* pnt = data.getBuffer();
|
---|
| 252 |
|
---|
| 253 | for( size_t i=0; i<len; i++, pnt++)
|
---|
| 254 | *pnt = encap->getData( i );
|
---|
| 255 |
|
---|
| 256 | Message* spovnetmsg = new Message(data);
|
---|
| 257 |
|
---|
| 258 | ostringstream o;
|
---|
| 259 | o << tcpsocket->remoteAddress().str() << ":" << encap->getPort();
|
---|
| 260 | spovnetmsg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
|
---|
| 261 |
|
---|
| 262 | logging_debug( cModule::fullPath() << " forwarding to base communication" );
|
---|
| 263 | MessageProvider::sendMessageToReceivers( spovnetmsg );
|
---|
| 264 |
|
---|
| 265 | delete encap;
|
---|
| 266 | }
|
---|
| 267 |
|
---|
| 268 | void AribaOmnetModule::socketFailure(int connId, void* socket, int code){
|
---|
| 269 | logging_warn( cModule::fullPath() << " socket failure " << code );
|
---|
| 270 | }
|
---|
| 271 |
|
---|
| 272 | void AribaOmnetModule::socketClosed(int connId, void* socket){
|
---|
| 273 | logging_debug( cModule::fullPath() << " socket closed" );
|
---|
| 274 | }
|
---|
| 275 |
|
---|
| 276 | void AribaOmnetModule::socketPeerClosed(int connId, void* socket){
|
---|
| 277 |
|
---|
| 278 | logging_debug( cModule::fullPath() << " socket peer closed" );
|
---|
| 279 | TCPSocket* tcpsocket = (TCPSocket*)socket;
|
---|
| 280 |
|
---|
| 281 | SocketMap::iterator i = sockets.begin();
|
---|
| 282 | SocketMap::iterator iend = sockets.end();
|
---|
| 283 |
|
---|
| 284 | for( ; i != iend; i++ ){
|
---|
| 285 |
|
---|
| 286 | if( i->second == tcpsocket ){
|
---|
| 287 | sockets.erase( i );
|
---|
| 288 | delete tcpsocket;
|
---|
| 289 | break;
|
---|
| 290 | }
|
---|
| 291 | }
|
---|
| 292 | }
|
---|
| 293 |
|
---|
| 294 | void AribaOmnetModule::socketEstablished(int connId, void* socket){
|
---|
| 295 |
|
---|
| 296 | logging_debug( cModule::fullPath() << " socket established" );
|
---|
| 297 |
|
---|
| 298 | TCPSocket* tcpsocket = (TCPSocket*)socket;
|
---|
| 299 | assert( tcpsocket != NULL );
|
---|
| 300 |
|
---|
| 301 | // if we have pending data for this socket
|
---|
| 302 | // we are on the client side and initiated the connection
|
---|
| 303 | // else, this is a dispatched socket on the server side
|
---|
| 304 |
|
---|
| 305 | PendingSendQueue::iterator i = pendingSends.find( tcpsocket );
|
---|
| 306 | if( i != pendingSends.end() ){
|
---|
| 307 |
|
---|
| 308 | logging_debug( cModule::fullPath() << " socket established ... scheduling send msg" );
|
---|
| 309 |
|
---|
| 310 | tcpsocket->send( i->second );
|
---|
| 311 | pendingSends.erase( i );
|
---|
| 312 |
|
---|
| 313 | } else {
|
---|
| 314 | logging_debug( cModule::fullPath() << " dispatch socket established ... server side" );
|
---|
| 315 | }
|
---|
| 316 | }
|
---|
| 317 |
|
---|
| 318 | void AribaOmnetModule::socketStatusArrived(int connId, void* socket, TCPStatusInfo *status){
|
---|
| 319 | logging_debug( cModule::fullPath() << " socket status arrivede" );
|
---|
| 320 | }
|
---|
| 321 |
|
---|
| 322 | }} // namespace ariba, communication, internal
|
---|