close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/communication/modules/transport/omnet/AribaOmnetModule.cpp@ 4608

Last change on this file since 4608 was 3690, checked in by mies, 16 years ago

Merged 20090512-mies-connectors changes r3472:r3689 into trunk.

File size: 9.1 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "AribaOmnetModule.h"
40
41#include "ariba/utility/system/StartupWrapper.h" // circular inclusion
42using ariba::utility::StartupWrapper;
43
44namespace ariba {
45namespace communication {
46
47use_logging_cpp( AribaOmnetModule );
48
49AribaOmnetModule::AribaOmnetModule(){
50}
51
52AribaOmnetModule::~AribaOmnetModule(){
53}
54
55void AribaOmnetModule::setServerPort(uint16_t _port){
56 serverPort = _port;
57}
58
59void AribaOmnetModule::start(){
60
61 //ostringstream o;
62 //o << (void*)this;
63 //cout << "AribaOmnetModule " + o.str() + " start" << std::endl;
64
65 Enter_Method_Silent();
66
67 serverSocket.setCallbackObject(this);
68 serverSocket.setOutputGate(gate("tcpOut"));
69 serverSocket.bind( IPvXAddress(), serverPort );
70 serverSocket.listen();
71
72 logging_debug( cModule::fullPath() << "listening on server socket" );
73}
74
75void AribaOmnetModule::stop(){
76
77 Enter_Method_Silent();
78
79 logging_debug( "stopping module " << cModule::fullPath() );
80
81 SocketMap::iterator i = sockets.begin();
82 SocketMap::iterator iend = sockets.end();
83
84 for( ; i != iend; i++ )
85 i->second->close();
86
87 serverSocket.close();
88}
89
90TransportLocator::prot_t AribaOmnetModule::getId(){
91 return 6; // TCP
92}
93
94const vector<TransportLocator*> AribaOmnetModule::getLocators(){
95 return vector<TransportLocator*>();
96}
97
98int AribaOmnetModule::numInitStages() const {
99 // the FlatNetworkConfiguration distributes the IP address in stage 3
100 // so to get the assigned IP address we init in stage 4 :)
101 return 4;
102}
103
104void AribaOmnetModule::initialize(int stage){
105 if( stage != 3 ) return;
106
107 StartupWrapper::initSystem();
108 StartupWrapper::initConfig( par("configfile").stringValue() );
109
110 StartupWrapper::insertCurrentModule( this );
111
112 logging_debug( "initializing " << cModule::fullPath() );
113 logging_debug( "AribaOmnetModule " << (void*)this << " initialize" );
114
115 StartupInterface* service = NULL;
116 cModuleType* type = findModuleType( ARIBA_SIMULATION_MODULE );
117
118 if( type != NULL ) {
119 logging_debug( "found module type ... creating ..." );
120 service = (StartupInterface*)type->create( ARIBA_SIMULATION_MODULE, this );
121 } else {
122 logging_fatal( "module type not found " << ARIBA_SIMULATION_MODULE );
123 }
124
125 if( service == NULL ){
126 logging_fatal( "no service defined for simulation. " <<
127 "service not loaded using load-libs in omnetpp.ini, " <<
128 " or ARIBA_SIMULATION_SERVICE() not used" );
129 } else {
130 StartupWrapper::startup( service );
131 }
132}
133
134void AribaOmnetModule::handleMessage(cMessage* msg){
135
136 logging_debug( cModule::fullPath() << " handling message" );
137 bool socketfound = false;
138
139 SocketMap::iterator i = sockets.begin();
140 SocketMap::iterator iend = sockets.end();
141
142 for( ; i != iend; i++ ){
143 if( i->second->belongsToSocket( msg )){
144 i->second->processMessage( msg );
145 socketfound = true;
146
147 logging_debug( cModule::fullPath() << " found socket for message" );
148 break;
149 }
150 }
151
152 if( ! socketfound ) {
153
154 logging_debug( cModule::fullPath() << " creating new socket for message" );
155
156 TCPSocket* dispatch = new TCPSocket( msg );
157 dispatch->setCallbackObject( this, dispatch );
158 dispatch->setOutputGate(gate("tcpOut"));
159
160 ostringstream o;
161 o << dispatch->remoteAddress().str() << ":" << dispatch->remotePort();
162
163 sockets.insert( make_pair(o.str(), dispatch) );
164 dispatch->processMessage( msg );
165 }
166}
167
168void AribaOmnetModule::finish(){
169 StartupWrapper::shutdown();
170}
171
172seqnum_t AribaOmnetModule::sendMessage(const Message* message){
173
174 Enter_Method_Silent();
175 logging_debug( cModule::fullPath() << " sending message" );
176
177 //
178 // serialize the data, get the destination address
179 //
180
181 Data data = data_serialize( message );
182 const_cast<Message*>(message)->dropPayload();
183
184 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
185 if( address == NULL ) return 0;
186
187 size_t len = data.getLength()/8;
188 uint8_t* buffer = data.getBuffer();
189
190 AribaOmnetMessage* outmsg = new AribaOmnetMessage( "AribaOmnetMessage");
191 outmsg->setPort( serverPort );
192 outmsg->setDataArraySize( len );
193 outmsg->setByteLength( len );
194
195 for( size_t i=0; i<len; i++, buffer++)
196 outmsg->setData(i, *buffer);
197
198 //
199 // find the socket for this endpoint
200 //
201
202 SocketMap::iterator i = sockets.find( address->toString() );
203 TCPSocket* connectionSocket = NULL;
204
205 if( i == sockets.end() ){
206
207 logging_debug( cModule::fullPath() <<
208 " creating new socket, connecting and queueing message" );
209
210 // don't have no connection yet for this endpoint
211 // initiate a connection and remember the message for later sending ...
212
213 SocketMap::iterator ret = sockets.insert(
214 make_pair(address->toString(), new TCPSocket()) );
215 connectionSocket = ret->second;
216
217 connectionSocket->setCallbackObject( this, connectionSocket );
218 connectionSocket->setOutputGate(gate("tcpOut"));
219
220 pendingSends.insert( make_pair(connectionSocket, outmsg) );
221 connectionSocket->connect( address->getIP().c_str(), address->getPort() );
222
223 } else {
224
225 logging_debug( cModule::fullPath() << " found socket, just sending out message" );
226 connectionSocket = i->second;
227 connectionSocket->send( outmsg );
228 }
229
230 //
231 // release the data and we are out!
232 //
233
234 data.release();
235 return 0;
236}
237
238void AribaOmnetModule::socketDataArrived(int connId, void* socket, cMessage* msg, bool urgent){
239
240 TCPSocket* tcpsocket = (TCPSocket*)socket;
241
242 AribaOmnetMessage* encap = dynamic_cast<AribaOmnetMessage*>(msg);
243 if( encap == NULL ) return;
244
245 logging_debug( cModule::fullPath() << " socket data arrived " << msg->info() );
246
247 size_t len = encap->getDataArraySize();
248 Data data( len*8 );
249 uint8_t* pnt = data.getBuffer();
250
251 for( size_t i=0; i<len; i++, pnt++)
252 *pnt = encap->getData( i );
253
254 Message* spovnetmsg = new Message(data);
255
256 ostringstream o;
257 o << tcpsocket->remoteAddress().str() << ":" << encap->getPort();
258 spovnetmsg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
259
260 logging_debug( cModule::fullPath() << " forwarding to base communication" );
261 MessageProvider::sendMessageToReceivers( spovnetmsg );
262
263 delete encap;
264}
265
266void AribaOmnetModule::socketFailure(int connId, void* socket, int code){
267 logging_warn( cModule::fullPath() << " socket failure " << code );
268}
269
270void AribaOmnetModule::socketClosed(int connId, void* socket){
271 logging_debug( cModule::fullPath() << " socket closed" );
272}
273
274void AribaOmnetModule::socketPeerClosed(int connId, void* socket){
275
276 logging_debug( cModule::fullPath() << " socket peer closed" );
277 TCPSocket* tcpsocket = (TCPSocket*)socket;
278
279 SocketMap::iterator i = sockets.begin();
280 SocketMap::iterator iend = sockets.end();
281
282 for( ; i != iend; i++ ){
283
284 if( i->second == tcpsocket ){
285 sockets.erase( i );
286 delete tcpsocket;
287 break;
288 }
289 }
290}
291
292void AribaOmnetModule::socketEstablished(int connId, void* socket){
293
294 logging_debug( cModule::fullPath() << " socket established" );
295
296 TCPSocket* tcpsocket = (TCPSocket*)socket;
297 assert( tcpsocket != NULL );
298
299 // if we have pending data for this socket
300 // we are on the client side and initiated the connection
301 // else, this is a dispatched socket on the server side
302
303 PendingSendQueue::iterator i = pendingSends.find( tcpsocket );
304 if( i != pendingSends.end() ){
305
306 logging_debug( cModule::fullPath() << " socket established ... scheduling send msg" );
307
308 tcpsocket->send( i->second );
309 pendingSends.erase( i );
310
311 } else {
312 logging_debug( cModule::fullPath() << " dispatch socket established ... server side" );
313 }
314}
315
316void AribaOmnetModule::socketStatusArrived(int connId, void* socket, TCPStatusInfo *status){
317 logging_debug( cModule::fullPath() << " socket status arrivede" );
318}
319
320}} // namespace ariba, communication, internal
Note: See TracBrowser for help on using the repository browser.