[5641] | 1 | // [License]
|
---|
| 2 | // The Ariba-Underlay Copyright
|
---|
| 3 | //
|
---|
| 4 | // Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
|
---|
| 5 | //
|
---|
| 6 | // Institute of Telematics
|
---|
| 7 | // UniversitÀt Karlsruhe (TH)
|
---|
| 8 | // Zirkel 2, 76128 Karlsruhe
|
---|
| 9 | // Germany
|
---|
| 10 | //
|
---|
| 11 | // Redistribution and use in source and binary forms, with or without
|
---|
| 12 | // modification, are permitted provided that the following conditions are
|
---|
| 13 | // met:
|
---|
| 14 | //
|
---|
| 15 | // 1. Redistributions of source code must retain the above copyright
|
---|
| 16 | // notice, this list of conditions and the following disclaimer.
|
---|
| 17 | // 2. Redistributions in binary form must reproduce the above copyright
|
---|
| 18 | // notice, this list of conditions and the following disclaimer in the
|
---|
| 19 | // documentation and/or other materials provided with the distribution.
|
---|
| 20 | //
|
---|
| 21 | // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
|
---|
| 22 | // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
---|
| 23 | // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
---|
| 24 | // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
|
---|
| 25 | // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
---|
| 26 | // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
---|
| 27 | // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
---|
| 28 | // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
---|
| 29 | // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
---|
| 30 | // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
---|
| 31 | // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
---|
| 32 | //
|
---|
| 33 | // The views and conclusions contained in the software and documentation
|
---|
| 34 | // are those of the authors and should not be interpreted as representing
|
---|
| 35 | // official policies, either expressed or implied, of the Institute of
|
---|
| 36 | // Telematics.
|
---|
| 37 | // [License]
|
---|
| 38 |
|
---|
| 39 | #include "AribaOmnetModule.h"
|
---|
| 40 |
|
---|
| 41 | #include "ariba/utility/system/StartupWrapper.h" // circular inclusion
|
---|
| 42 | using ariba::utility::StartupWrapper;
|
---|
| 43 |
|
---|
| 44 | namespace ariba {
|
---|
| 45 | namespace communication {
|
---|
| 46 |
|
---|
| 47 | use_logging_cpp( AribaOmnetModule );
|
---|
| 48 |
|
---|
| 49 | AribaOmnetModule::AribaOmnetModule(){
|
---|
| 50 | }
|
---|
| 51 |
|
---|
| 52 | AribaOmnetModule::~AribaOmnetModule(){
|
---|
| 53 | }
|
---|
| 54 |
|
---|
| 55 | void AribaOmnetModule::setServerPort(uint16_t _port){
|
---|
| 56 | serverPort = _port;
|
---|
| 57 | }
|
---|
| 58 |
|
---|
| 59 | void AribaOmnetModule::start(){
|
---|
| 60 |
|
---|
| 61 | //ostringstream o;
|
---|
| 62 | //o << (void*)this;
|
---|
| 63 | //cout << "AribaOmnetModule " + o.str() + " start" << std::endl;
|
---|
| 64 |
|
---|
| 65 | Enter_Method_Silent();
|
---|
| 66 |
|
---|
| 67 | serverSocket.setCallbackObject(this);
|
---|
| 68 | serverSocket.setOutputGate(gate("tcpOut"));
|
---|
| 69 | serverSocket.bind( IPvXAddress(), serverPort );
|
---|
| 70 | serverSocket.listen();
|
---|
| 71 |
|
---|
| 72 | logging_debug( cModule::fullPath() << "listening on server socket" );
|
---|
| 73 | }
|
---|
| 74 |
|
---|
| 75 | void AribaOmnetModule::stop(){
|
---|
| 76 |
|
---|
| 77 | Enter_Method_Silent();
|
---|
| 78 |
|
---|
| 79 | logging_debug( "stopping module " << cModule::fullPath() );
|
---|
| 80 |
|
---|
| 81 | SocketMap::iterator i = sockets.begin();
|
---|
| 82 | SocketMap::iterator iend = sockets.end();
|
---|
| 83 |
|
---|
| 84 | for( ; i != iend; i++ )
|
---|
| 85 | i->second->close();
|
---|
| 86 |
|
---|
| 87 | serverSocket.close();
|
---|
| 88 | }
|
---|
| 89 |
|
---|
| 90 | TransportLocator::prot_t AribaOmnetModule::getId(){
|
---|
| 91 | return 6; // TCP
|
---|
| 92 | }
|
---|
| 93 |
|
---|
| 94 | const vector<TransportLocator*> AribaOmnetModule::getLocators(){
|
---|
| 95 | return vector<TransportLocator*>();
|
---|
| 96 | }
|
---|
| 97 |
|
---|
| 98 | int AribaOmnetModule::numInitStages() const {
|
---|
| 99 | // the FlatNetworkConfiguration distributes the IP address in stage 3
|
---|
| 100 | // so to get the assigned IP address we init in stage 4 :)
|
---|
| 101 | return 4;
|
---|
| 102 | }
|
---|
| 103 |
|
---|
| 104 | void AribaOmnetModule::initialize(int stage){
|
---|
| 105 | if( stage != 3 ) return;
|
---|
| 106 |
|
---|
| 107 | StartupWrapper::initSystem();
|
---|
| 108 | StartupWrapper::initConfig( par("configfile").stringValue() );
|
---|
| 109 |
|
---|
| 110 | StartupWrapper::insertCurrentModule( this );
|
---|
| 111 |
|
---|
| 112 | logging_debug( "initializing " << cModule::fullPath() );
|
---|
| 113 | logging_debug( "AribaOmnetModule " << (void*)this << " initialize" );
|
---|
| 114 |
|
---|
| 115 | StartupInterface* service = NULL;
|
---|
| 116 | cModuleType* type = findModuleType( ARIBA_SIMULATION_MODULE );
|
---|
| 117 |
|
---|
| 118 | if( type != NULL ) {
|
---|
| 119 | logging_debug( "found module type ... creating ..." );
|
---|
| 120 | service = (StartupInterface*)type->create( ARIBA_SIMULATION_MODULE, this );
|
---|
| 121 | } else {
|
---|
| 122 | logging_fatal( "module type not found " << ARIBA_SIMULATION_MODULE );
|
---|
| 123 | }
|
---|
| 124 |
|
---|
| 125 | if( service == NULL ){
|
---|
| 126 | logging_fatal( "no service defined for simulation. " <<
|
---|
| 127 | "service not loaded using load-libs in omnetpp.ini, " <<
|
---|
| 128 | " or ARIBA_SIMULATION_SERVICE() not used" );
|
---|
| 129 | } else {
|
---|
| 130 | StartupWrapper::startup( service );
|
---|
| 131 | }
|
---|
| 132 | }
|
---|
| 133 |
|
---|
| 134 | void AribaOmnetModule::handleMessage(cMessage* msg){
|
---|
| 135 |
|
---|
| 136 | logging_debug( cModule::fullPath() << " handling message" );
|
---|
| 137 | bool socketfound = false;
|
---|
| 138 |
|
---|
| 139 | SocketMap::iterator i = sockets.begin();
|
---|
| 140 | SocketMap::iterator iend = sockets.end();
|
---|
| 141 |
|
---|
| 142 | for( ; i != iend; i++ ){
|
---|
| 143 | if( i->second->belongsToSocket( msg )){
|
---|
| 144 | i->second->processMessage( msg );
|
---|
| 145 | socketfound = true;
|
---|
| 146 |
|
---|
| 147 | logging_debug( cModule::fullPath() << " found socket for message" );
|
---|
| 148 | break;
|
---|
| 149 | }
|
---|
| 150 | }
|
---|
| 151 |
|
---|
| 152 | if( ! socketfound ) {
|
---|
| 153 |
|
---|
| 154 | logging_debug( cModule::fullPath() << " creating new socket for message" );
|
---|
| 155 |
|
---|
| 156 | TCPSocket* dispatch = new TCPSocket( msg );
|
---|
| 157 | dispatch->setCallbackObject( this, dispatch );
|
---|
| 158 | dispatch->setOutputGate(gate("tcpOut"));
|
---|
| 159 |
|
---|
| 160 | ostringstream o;
|
---|
| 161 | o << dispatch->remoteAddress().str() << ":" << dispatch->remotePort();
|
---|
| 162 |
|
---|
| 163 | sockets.insert( make_pair(o.str(), dispatch) );
|
---|
| 164 | dispatch->processMessage( msg );
|
---|
| 165 | }
|
---|
| 166 | }
|
---|
| 167 |
|
---|
| 168 | void AribaOmnetModule::finish(){
|
---|
| 169 | StartupWrapper::shutdown();
|
---|
| 170 | }
|
---|
| 171 |
|
---|
| 172 | seqnum_t AribaOmnetModule::sendMessage(const Message* message){
|
---|
| 173 |
|
---|
| 174 | Enter_Method_Silent();
|
---|
| 175 | logging_debug( cModule::fullPath() << " sending message" );
|
---|
| 176 |
|
---|
| 177 | //
|
---|
| 178 | // serialize the data, get the destination address
|
---|
| 179 | //
|
---|
| 180 |
|
---|
| 181 | Data data = data_serialize( message );
|
---|
| 182 | const_cast<Message*>(message)->dropPayload();
|
---|
| 183 |
|
---|
| 184 | const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
|
---|
| 185 | if( address == NULL ) return 0;
|
---|
| 186 |
|
---|
| 187 | size_t len = data.getLength()/8;
|
---|
| 188 | uint8_t* buffer = data.getBuffer();
|
---|
| 189 |
|
---|
| 190 | AribaOmnetMessage* outmsg = new AribaOmnetMessage( "AribaOmnetMessage");
|
---|
| 191 | outmsg->setPort( serverPort );
|
---|
| 192 | outmsg->setDataArraySize( len );
|
---|
| 193 | outmsg->setByteLength( len );
|
---|
| 194 |
|
---|
| 195 | for( size_t i=0; i<len; i++, buffer++)
|
---|
| 196 | outmsg->setData(i, *buffer);
|
---|
| 197 |
|
---|
| 198 | //
|
---|
| 199 | // find the socket for this endpoint
|
---|
| 200 | //
|
---|
| 201 |
|
---|
| 202 | SocketMap::iterator i = sockets.find( address->toString() );
|
---|
| 203 | TCPSocket* connectionSocket = NULL;
|
---|
| 204 |
|
---|
| 205 | if( i == sockets.end() ){
|
---|
| 206 |
|
---|
| 207 | logging_debug( cModule::fullPath() <<
|
---|
| 208 | " creating new socket, connecting and queueing message" );
|
---|
| 209 |
|
---|
| 210 | // don't have no connection yet for this endpoint
|
---|
| 211 | // initiate a connection and remember the message for later sending ...
|
---|
| 212 |
|
---|
| 213 | SocketMap::iterator ret = sockets.insert(
|
---|
| 214 | make_pair(address->toString(), new TCPSocket()) );
|
---|
| 215 | connectionSocket = ret->second;
|
---|
| 216 |
|
---|
| 217 | connectionSocket->setCallbackObject( this, connectionSocket );
|
---|
| 218 | connectionSocket->setOutputGate(gate("tcpOut"));
|
---|
| 219 |
|
---|
| 220 | pendingSends.insert( make_pair(connectionSocket, outmsg) );
|
---|
| 221 | connectionSocket->connect( address->getIP().c_str(), address->getPort() );
|
---|
| 222 |
|
---|
| 223 | } else {
|
---|
| 224 |
|
---|
| 225 | logging_debug( cModule::fullPath() << " found socket, just sending out message" );
|
---|
| 226 | connectionSocket = i->second;
|
---|
| 227 | connectionSocket->send( outmsg );
|
---|
| 228 | }
|
---|
| 229 |
|
---|
| 230 | //
|
---|
| 231 | // release the data and we are out!
|
---|
| 232 | //
|
---|
| 233 |
|
---|
| 234 | data.release();
|
---|
| 235 | return 0;
|
---|
| 236 | }
|
---|
| 237 |
|
---|
| 238 | void AribaOmnetModule::socketDataArrived(int connId, void* socket, cMessage* msg, bool urgent){
|
---|
| 239 |
|
---|
| 240 | TCPSocket* tcpsocket = (TCPSocket*)socket;
|
---|
| 241 |
|
---|
| 242 | AribaOmnetMessage* encap = dynamic_cast<AribaOmnetMessage*>(msg);
|
---|
| 243 | if( encap == NULL ) return;
|
---|
| 244 |
|
---|
| 245 | logging_debug( cModule::fullPath() << " socket data arrived " << msg->info() );
|
---|
| 246 |
|
---|
| 247 | size_t len = encap->getDataArraySize();
|
---|
| 248 | Data data( len*8 );
|
---|
| 249 | uint8_t* pnt = data.getBuffer();
|
---|
| 250 |
|
---|
| 251 | for( size_t i=0; i<len; i++, pnt++)
|
---|
| 252 | *pnt = encap->getData( i );
|
---|
| 253 |
|
---|
| 254 | Message* spovnetmsg = new Message(data);
|
---|
| 255 |
|
---|
| 256 | ostringstream o;
|
---|
| 257 | o << tcpsocket->remoteAddress().str() << ":" << encap->getPort();
|
---|
| 258 | spovnetmsg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
|
---|
| 259 |
|
---|
| 260 | logging_debug( cModule::fullPath() << " forwarding to base communication" );
|
---|
| 261 | MessageProvider::sendMessageToReceivers( spovnetmsg );
|
---|
| 262 |
|
---|
| 263 | delete encap;
|
---|
| 264 | }
|
---|
| 265 |
|
---|
| 266 | void AribaOmnetModule::socketFailure(int connId, void* socket, int code){
|
---|
| 267 | logging_warn( cModule::fullPath() << " socket failure " << code );
|
---|
| 268 | }
|
---|
| 269 |
|
---|
| 270 | void AribaOmnetModule::socketClosed(int connId, void* socket){
|
---|
| 271 | logging_debug( cModule::fullPath() << " socket closed" );
|
---|
| 272 | }
|
---|
| 273 |
|
---|
| 274 | void AribaOmnetModule::socketPeerClosed(int connId, void* socket){
|
---|
| 275 |
|
---|
| 276 | logging_debug( cModule::fullPath() << " socket peer closed" );
|
---|
| 277 | TCPSocket* tcpsocket = (TCPSocket*)socket;
|
---|
| 278 |
|
---|
| 279 | SocketMap::iterator i = sockets.begin();
|
---|
| 280 | SocketMap::iterator iend = sockets.end();
|
---|
| 281 |
|
---|
| 282 | for( ; i != iend; i++ ){
|
---|
| 283 |
|
---|
| 284 | if( i->second == tcpsocket ){
|
---|
| 285 | sockets.erase( i );
|
---|
| 286 | delete tcpsocket;
|
---|
| 287 | break;
|
---|
| 288 | }
|
---|
| 289 | }
|
---|
| 290 | }
|
---|
| 291 |
|
---|
| 292 | void AribaOmnetModule::socketEstablished(int connId, void* socket){
|
---|
| 293 |
|
---|
| 294 | logging_debug( cModule::fullPath() << " socket established" );
|
---|
| 295 |
|
---|
| 296 | TCPSocket* tcpsocket = (TCPSocket*)socket;
|
---|
| 297 | assert( tcpsocket != NULL );
|
---|
| 298 |
|
---|
| 299 | // if we have pending data for this socket
|
---|
| 300 | // we are on the client side and initiated the connection
|
---|
| 301 | // else, this is a dispatched socket on the server side
|
---|
| 302 |
|
---|
| 303 | PendingSendQueue::iterator i = pendingSends.find( tcpsocket );
|
---|
| 304 | if( i != pendingSends.end() ){
|
---|
| 305 |
|
---|
| 306 | logging_debug( cModule::fullPath() << " socket established ... scheduling send msg" );
|
---|
| 307 |
|
---|
| 308 | tcpsocket->send( i->second );
|
---|
| 309 | pendingSends.erase( i );
|
---|
| 310 |
|
---|
| 311 | } else {
|
---|
| 312 | logging_debug( cModule::fullPath() << " dispatch socket established ... server side" );
|
---|
| 313 | }
|
---|
| 314 | }
|
---|
| 315 |
|
---|
| 316 | void AribaOmnetModule::socketStatusArrived(int connId, void* socket, TCPStatusInfo *status){
|
---|
| 317 | logging_debug( cModule::fullPath() << " socket status arrivede" );
|
---|
| 318 | }
|
---|
| 319 |
|
---|
| 320 | }} // namespace ariba, communication, internal
|
---|