An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/communication/modules/transport/omnet/AribaOmnetModule.cpp @ 3690

Last change on this file since 3690 was 3690, checked in by mies, 14 years ago

Merged 20090512-mies-connectors changes r3472:r3689 into trunk.

File size: 9.1 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "AribaOmnetModule.h"
40
41#include "ariba/utility/system/StartupWrapper.h" // circular inclusion
42using ariba::utility::StartupWrapper;
43
44namespace ariba {
45namespace communication {
46
47use_logging_cpp( AribaOmnetModule );
48
49AribaOmnetModule::AribaOmnetModule(){
50}
51
52AribaOmnetModule::~AribaOmnetModule(){
53}
54
55void AribaOmnetModule::setServerPort(uint16_t _port){
56        serverPort = _port;
57}
58
59void AribaOmnetModule::start(){
60
61        //ostringstream o;
62        //o << (void*)this;
63        //cout << "AribaOmnetModule " + o.str() + " start" << std::endl;
64
65        Enter_Method_Silent();
66
67        serverSocket.setCallbackObject(this);
68        serverSocket.setOutputGate(gate("tcpOut"));
69        serverSocket.bind( IPvXAddress(), serverPort );
70        serverSocket.listen();
71
72        logging_debug( cModule::fullPath() << "listening on server socket" );
73}
74
75void AribaOmnetModule::stop(){
76
77        Enter_Method_Silent();
78
79        logging_debug( "stopping module " << cModule::fullPath() );
80
81        SocketMap::iterator i = sockets.begin();
82        SocketMap::iterator iend = sockets.end();
83
84        for( ; i != iend; i++ )
85                i->second->close();
86
87        serverSocket.close();
88}
89
90TransportLocator::prot_t AribaOmnetModule::getId(){
91        return 6; // TCP
92}
93
94const vector<TransportLocator*> AribaOmnetModule::getLocators(){
95        return vector<TransportLocator*>();
96}
97
98int AribaOmnetModule::numInitStages() const {
99        // the FlatNetworkConfiguration distributes the IP address in stage 3
100        // so to get the assigned IP address we init in stage 4 :)
101        return 4;
102}
103
104void AribaOmnetModule::initialize(int stage){
105        if( stage != 3 ) return;
106
107        StartupWrapper::initSystem();
108        StartupWrapper::initConfig( par("configfile").stringValue() );
109
110        StartupWrapper::insertCurrentModule( this );
111
112        logging_debug( "initializing " << cModule::fullPath() );
113        logging_debug( "AribaOmnetModule " << (void*)this << " initialize" );
114
115        StartupInterface* service = NULL;
116        cModuleType* type = findModuleType( ARIBA_SIMULATION_MODULE );
117
118        if( type != NULL ) {
119                logging_debug( "found module type ... creating ..." );
120                service = (StartupInterface*)type->create( ARIBA_SIMULATION_MODULE, this );
121        } else {
122                logging_fatal( "module type not found " << ARIBA_SIMULATION_MODULE );
123        }
124
125        if( service == NULL ){
126                logging_fatal( "no service defined for simulation. " <<
127                                "service not loaded using load-libs in omnetpp.ini, " <<
128                                " or ARIBA_SIMULATION_SERVICE() not used" );
129        } else {
130                StartupWrapper::startup( service );
131        }
132}
133
134void AribaOmnetModule::handleMessage(cMessage* msg){
135
136        logging_debug( cModule::fullPath() << " handling message" );
137        bool socketfound = false;
138
139        SocketMap::iterator i = sockets.begin();
140        SocketMap::iterator iend = sockets.end();
141
142        for( ; i != iend; i++ ){
143                if( i->second->belongsToSocket( msg )){
144                        i->second->processMessage( msg );
145                        socketfound = true;
146
147                        logging_debug( cModule::fullPath() << " found socket for message" );
148                        break;
149                }
150        }
151
152        if( ! socketfound ) {
153
154                logging_debug( cModule::fullPath() << " creating new socket for message" );
155
156                TCPSocket* dispatch = new TCPSocket( msg );
157                dispatch->setCallbackObject( this, dispatch );
158                dispatch->setOutputGate(gate("tcpOut"));
159
160                ostringstream o;
161                o << dispatch->remoteAddress().str() << ":" << dispatch->remotePort();
162
163                sockets.insert( make_pair(o.str(), dispatch) );
164                dispatch->processMessage( msg );
165        }
166}
167
168void AribaOmnetModule::finish(){
169        StartupWrapper::shutdown();
170}
171
172seqnum_t AribaOmnetModule::sendMessage(const Message* message){
173
174        Enter_Method_Silent();
175        logging_debug( cModule::fullPath() << " sending message" );
176
177        //
178        // serialize the data, get the destination address
179        //
180
181        Data data = data_serialize( message );
182        const_cast<Message*>(message)->dropPayload();
183
184        const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
185        if( address == NULL ) return 0;
186
187        size_t len = data.getLength()/8;
188        uint8_t* buffer = data.getBuffer();
189
190        AribaOmnetMessage* outmsg = new AribaOmnetMessage( "AribaOmnetMessage");
191        outmsg->setPort( serverPort );
192        outmsg->setDataArraySize( len );
193        outmsg->setByteLength( len );
194
195        for( size_t i=0; i<len; i++, buffer++)
196                outmsg->setData(i, *buffer);
197
198        //
199        // find the socket for this endpoint
200        //
201
202        SocketMap::iterator i = sockets.find( address->toString() );
203        TCPSocket* connectionSocket = NULL;
204
205        if( i == sockets.end() ){
206
207                logging_debug( cModule::fullPath() <<
208                        " creating new socket, connecting and queueing message" );
209
210                // don't have no connection yet for this endpoint
211                // initiate a connection and remember the message for later sending ...
212
213                SocketMap::iterator ret = sockets.insert(
214                                        make_pair(address->toString(), new TCPSocket()) );
215                connectionSocket = ret->second;
216
217                connectionSocket->setCallbackObject( this, connectionSocket );
218                connectionSocket->setOutputGate(gate("tcpOut"));
219
220                pendingSends.insert( make_pair(connectionSocket, outmsg) );
221                connectionSocket->connect( address->getIP().c_str(), address->getPort() );
222
223        } else {
224
225                logging_debug( cModule::fullPath() << " found socket, just sending out message" );
226                connectionSocket = i->second;
227                connectionSocket->send( outmsg );
228        }
229
230        //
231        // release the data and we are out!
232        //
233
234        data.release();
235        return 0;
236}
237
238void AribaOmnetModule::socketDataArrived(int connId, void* socket, cMessage* msg, bool urgent){
239
240        TCPSocket* tcpsocket = (TCPSocket*)socket;
241
242        AribaOmnetMessage* encap = dynamic_cast<AribaOmnetMessage*>(msg);
243        if( encap == NULL ) return;
244
245        logging_debug( cModule::fullPath() << " socket data arrived " << msg->info() );
246
247        size_t len = encap->getDataArraySize();
248        Data data( len*8 );
249        uint8_t* pnt = data.getBuffer();
250
251        for( size_t i=0; i<len; i++, pnt++)
252                *pnt = encap->getData( i );
253
254        Message* spovnetmsg = new Message(data);
255
256        ostringstream o;
257        o << tcpsocket->remoteAddress().str() << ":" << encap->getPort();
258        spovnetmsg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
259
260        logging_debug( cModule::fullPath() << " forwarding to base communication" );
261        MessageProvider::sendMessageToReceivers( spovnetmsg );
262
263        delete encap;
264}
265
266void AribaOmnetModule::socketFailure(int connId, void* socket, int code){
267        logging_warn( cModule::fullPath() << " socket failure " << code );
268}
269
270void AribaOmnetModule::socketClosed(int connId, void* socket){
271        logging_debug( cModule::fullPath() << " socket closed" );
272}
273
274void AribaOmnetModule::socketPeerClosed(int connId, void* socket){
275
276        logging_debug( cModule::fullPath() << " socket peer closed" );
277        TCPSocket* tcpsocket = (TCPSocket*)socket;
278
279        SocketMap::iterator i = sockets.begin();
280        SocketMap::iterator iend = sockets.end();
281
282        for( ; i != iend; i++ ){
283
284                if( i->second == tcpsocket ){
285                        sockets.erase( i );
286                        delete tcpsocket;
287                        break;
288                }
289        }
290}
291
292void AribaOmnetModule::socketEstablished(int connId, void* socket){
293
294        logging_debug( cModule::fullPath() << " socket established" );
295
296        TCPSocket* tcpsocket = (TCPSocket*)socket;
297        assert( tcpsocket != NULL );
298
299        // if we have pending data for this socket
300        // we are on the client side and initiated the connection
301        // else, this is a dispatched socket on the server side
302
303        PendingSendQueue::iterator i = pendingSends.find( tcpsocket );
304        if( i != pendingSends.end() ){
305
306                logging_debug( cModule::fullPath() << " socket established ... scheduling send msg" );
307
308                tcpsocket->send( i->second );
309                pendingSends.erase( i );
310
311        } else {
312                logging_debug( cModule::fullPath() << " dispatch socket established ... server side" );
313        }
314}
315
316void AribaOmnetModule::socketStatusArrived(int connId, void* socket, TCPStatusInfo *status){
317        logging_debug( cModule::fullPath() << " socket status arrivede" );
318}
319
320}} // namespace ariba, communication, internal
Note: See TracBrowser for help on using the repository browser.