close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/communication/modules/transport/omnet/AribaOmnetModule.cpp@ 2438

Last change on this file since 2438 was 2438, checked in by Christoph Mayer, 16 years ago

-textual

File size: 9.2 KB
RevLine 
1// [Licence]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [Licence]
38
39#include "AribaOmnetModule.h"
40
41#include "ariba/utility/system/StartupWrapper.h" // circular inclusion
42#include "ariba/interface/ServiceInterface.h"
43using ariba::utility::StartupWrapper;
44using ariba::interface::ServiceInterface;
45
46namespace ariba {
47namespace communication {
48
49use_logging_cpp( AribaOmnetModule );
50
51AribaOmnetModule::AribaOmnetModule(){
52}
53
54AribaOmnetModule::~AribaOmnetModule(){
55}
56
57void AribaOmnetModule::setServerPort(uint16_t _port){
58 serverPort = _port;
59}
60
61void AribaOmnetModule::start(){
62
63 //ostringstream o;
64 //o << (void*)this;
65 //cout << "AribaOmnetModule " + o.str() + " start" << std::endl;
66
67 Enter_Method_Silent();
68
69 serverSocket.setCallbackObject(this);
70 serverSocket.setOutputGate(gate("tcpOut"));
71 serverSocket.bind( IPvXAddress(), serverPort );
72 serverSocket.listen();
73
74 logging_debug( cModule::fullPath() << "listening on server socket" );
75}
76
77void AribaOmnetModule::stop(){
78
79 Enter_Method_Silent();
80
81 logging_debug( "stopping module " << cModule::fullPath() );
82
83 SocketMap::iterator i = sockets.begin();
84 SocketMap::iterator iend = sockets.end();
85
86 for( ; i != iend; i++ )
87 i->second->close();
88
89 serverSocket.close();
90}
91
92TransportLocator::prot_t AribaOmnetModule::getId(){
93 return 6; // TCP
94}
95
96const vector<TransportLocator*> AribaOmnetModule::getLocators(){
97 return vector<TransportLocator*>();
98}
99
100int AribaOmnetModule::numInitStages() const {
101 // the FlatNetworkConfiguration distributes the IP address in stage 3
102 // so to get the assigned IP address we init in stage 4 :)
103 return 4;
104}
105
106void AribaOmnetModule::initialize(int stage){
107 if( stage != 3 ) return;
108
109 StartupWrapper::initSystem();
110 StartupWrapper::initConfig( par("configfile").stringValue() );
111
112 StartupWrapper::insertCurrentModule( this );
113
114 logging_debug( "initializing " << cModule::fullPath() );
115 logging_debug( "AribaOmnetModule " << (void*)this << " initialize" );
116
117 StartupInterface* service = NULL;
118 cModuleType* type = findModuleType( ARIBA_SIMULATION_MODULE );
119
120 if( type != NULL ) {
121 logging_debug( "found module type ... creating ..." );
122 service = (StartupInterface*)type->create( ARIBA_SIMULATION_MODULE, this );
123 } else {
124 logging_fatal( "module type not found " << ARIBA_SIMULATION_MODULE );
125 }
126
127 if( service == NULL ){
128 logging_fatal( "no service defined for simulation. " <<
129 "service not loaded using load-libs in omnetpp.ini, " <<
130 " or ARIBA_SIMULATION_SERVICE() not used" );
131 } else {
132 StartupWrapper::startup( service );
133 }
134}
135
136void AribaOmnetModule::handleMessage(cMessage* msg){
137
138 logging_debug( cModule::fullPath() << " handling message" );
139 bool socketfound = false;
140
141 SocketMap::iterator i = sockets.begin();
142 SocketMap::iterator iend = sockets.end();
143
144 for( ; i != iend; i++ ){
145 if( i->second->belongsToSocket( msg )){
146 i->second->processMessage( msg );
147 socketfound = true;
148
149 logging_debug( cModule::fullPath() << " found socket for message" );
150 break;
151 }
152 }
153
154 if( ! socketfound ) {
155
156 logging_debug( cModule::fullPath() << " creating new socket for message" );
157
158 TCPSocket* dispatch = new TCPSocket( msg );
159 dispatch->setCallbackObject( this, dispatch );
160 dispatch->setOutputGate(gate("tcpOut"));
161
162 ostringstream o;
163 o << dispatch->remoteAddress().str() << ":" << dispatch->remotePort();
164
165 sockets.insert( make_pair(o.str(), dispatch) );
166 dispatch->processMessage( msg );
167 }
168}
169
170void AribaOmnetModule::finish(){
171 StartupWrapper::shutdown();
172}
173
174seqnum_t AribaOmnetModule::sendMessage(const Message* message){
175
176 Enter_Method_Silent();
177 logging_debug( cModule::fullPath() << " sending message" );
178
179 //
180 // serialize the data, get the destination address
181 //
182
183 Data data = data_serialize( message );
184 const_cast<Message*>(message)->dropPayload();
185
186 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
187 if( address == NULL ) return 0;
188
189 size_t len = data.getLength()/8;
190 uint8_t* buffer = data.getBuffer();
191
192 AribaOmnetMessage* outmsg = new AribaOmnetMessage( "AribaOmnetMessage");
193 outmsg->setPort( serverPort );
194 outmsg->setDataArraySize( len );
195 outmsg->setByteLength( len );
196
197 for( size_t i=0; i<len; i++, buffer++)
198 outmsg->setData(i, *buffer);
199
200 //
201 // find the socket for this endpoint
202 //
203
204 SocketMap::iterator i = sockets.find( address->toString() );
205 TCPSocket* connectionSocket = NULL;
206
207 if( i == sockets.end() ){
208
209 logging_debug( cModule::fullPath() <<
210 " creating new socket, connecting and queueing message" );
211
212 // don't have no connection yet for this endpoint
213 // initiate a connection and remember the message for later sending ...
214
215 SocketMap::iterator ret = sockets.insert(
216 make_pair(address->toString(), new TCPSocket()) );
217 connectionSocket = ret->second;
218
219 connectionSocket->setCallbackObject( this, connectionSocket );
220 connectionSocket->setOutputGate(gate("tcpOut"));
221
222 pendingSends.insert( make_pair(connectionSocket, outmsg) );
223 connectionSocket->connect( address->getIP().c_str(), address->getPort() );
224
225 } else {
226
227 logging_debug( cModule::fullPath() << " found socket, just sending out message" );
228 connectionSocket = i->second;
229 connectionSocket->send( outmsg );
230 }
231
232 //
233 // release the data and we are out!
234 //
235
236 data.release();
237 return 0;
238}
239
240void AribaOmnetModule::socketDataArrived(int connId, void* socket, cMessage* msg, bool urgent){
241
242 TCPSocket* tcpsocket = (TCPSocket*)socket;
243
244 AribaOmnetMessage* encap = dynamic_cast<AribaOmnetMessage*>(msg);
245 if( encap == NULL ) return;
246
247 logging_debug( cModule::fullPath() << " socket data arrived " << msg->info() );
248
249 size_t len = encap->getDataArraySize();
250 Data data( len*8 );
251 uint8_t* pnt = data.getBuffer();
252
253 for( size_t i=0; i<len; i++, pnt++)
254 *pnt = encap->getData( i );
255
256 Message* spovnetmsg = new Message(data);
257
258 ostringstream o;
259 o << tcpsocket->remoteAddress().str() << ":" << encap->getPort();
260 spovnetmsg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
261
262 logging_debug( cModule::fullPath() << " forwarding to base communication" );
263 MessageProvider::sendMessageToReceivers( spovnetmsg );
264
265 delete encap;
266}
267
268void AribaOmnetModule::socketFailure(int connId, void* socket, int code){
269 logging_warn( cModule::fullPath() << " socket failure " << code );
270}
271
272void AribaOmnetModule::socketClosed(int connId, void* socket){
273 logging_debug( cModule::fullPath() << " socket closed" );
274}
275
276void AribaOmnetModule::socketPeerClosed(int connId, void* socket){
277
278 logging_debug( cModule::fullPath() << " socket peer closed" );
279 TCPSocket* tcpsocket = (TCPSocket*)socket;
280
281 SocketMap::iterator i = sockets.begin();
282 SocketMap::iterator iend = sockets.end();
283
284 for( ; i != iend; i++ ){
285
286 if( i->second == tcpsocket ){
287 sockets.erase( i );
288 delete tcpsocket;
289 break;
290 }
291 }
292}
293
294void AribaOmnetModule::socketEstablished(int connId, void* socket){
295
296 logging_debug( cModule::fullPath() << " socket established" );
297
298 TCPSocket* tcpsocket = (TCPSocket*)socket;
299 assert( tcpsocket != NULL );
300
301 // if we have pending data for this socket
302 // we are on the client side and initiated the connection
303 // else, this is a dispatched socket on the server side
304
305 PendingSendQueue::iterator i = pendingSends.find( tcpsocket );
306 if( i != pendingSends.end() ){
307
308 logging_debug( cModule::fullPath() << " socket established ... scheduling send msg" );
309
310 tcpsocket->send( i->second );
311 pendingSends.erase( i );
312
313 } else {
314 logging_debug( cModule::fullPath() << " dispatch socket established ... server side" );
315 }
316}
317
318void AribaOmnetModule::socketStatusArrived(int connId, void* socket, TCPStatusInfo *status){
319 logging_debug( cModule::fullPath() << " socket status arrivede" );
320}
321
322}} // namespace ariba, communication, internal
Note: See TracBrowser for help on using the repository browser.