00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "AribaOmnetModule.h"
00040
00041 #include "ariba/utility/system/StartupWrapper.h"
00042 using ariba::utility::StartupWrapper;
00043
00044 namespace ariba {
00045 namespace communication {
00046
00047 use_logging_cpp( AribaOmnetModule );
00048
00049 AribaOmnetModule::AribaOmnetModule(){
00050 }
00051
00052 AribaOmnetModule::~AribaOmnetModule(){
00053 }
00054
00055 void AribaOmnetModule::setServerPort(uint16_t _port){
00056 serverPort = _port;
00057 }
00058
00059 void AribaOmnetModule::start(){
00060
00061
00062
00063
00064
00065 Enter_Method_Silent();
00066
00067 serverSocket.setCallbackObject(this);
00068 serverSocket.setOutputGate(gate("tcpOut"));
00069 serverSocket.bind( IPvXAddress(), serverPort );
00070 serverSocket.listen();
00071
00072 logging_debug( cModule::fullPath() << "listening on server socket" );
00073 }
00074
00075 void AribaOmnetModule::stop(){
00076
00077 Enter_Method_Silent();
00078
00079 logging_debug( "stopping module " << cModule::fullPath() );
00080
00081 SocketMap::iterator i = sockets.begin();
00082 SocketMap::iterator iend = sockets.end();
00083
00084 for( ; i != iend; i++ )
00085 i->second->close();
00086
00087 serverSocket.close();
00088 }
00089
00090 TransportLocator::prot_t AribaOmnetModule::getId(){
00091 return 6;
00092 }
00093
00094 const vector<TransportLocator*> AribaOmnetModule::getLocators(){
00095 return vector<TransportLocator*>();
00096 }
00097
00098 int AribaOmnetModule::numInitStages() const {
00099
00100
00101 return 4;
00102 }
00103
00104 void AribaOmnetModule::initialize(int stage){
00105 if( stage != 3 ) return;
00106
00107 StartupWrapper::initSystem();
00108 StartupWrapper::initConfig( par("configfile").stringValue() );
00109
00110 StartupWrapper::insertCurrentModule( this );
00111
00112 logging_debug( "initializing " << cModule::fullPath() );
00113 logging_debug( "AribaOmnetModule " << (void*)this << " initialize" );
00114
00115 StartupInterface* service = NULL;
00116 cModuleType* type = findModuleType( ARIBA_SIMULATION_MODULE );
00117
00118 if( type != NULL ) {
00119 logging_debug( "found module type ... creating ..." );
00120 service = (StartupInterface*)type->create( ARIBA_SIMULATION_MODULE, this );
00121 } else {
00122 logging_fatal( "module type not found " << ARIBA_SIMULATION_MODULE );
00123 }
00124
00125 if( service == NULL ){
00126 logging_fatal( "no service defined for simulation. " <<
00127 "service not loaded using load-libs in omnetpp.ini, " <<
00128 " or ARIBA_SIMULATION_SERVICE() not used" );
00129 } else {
00130 StartupWrapper::startup( service );
00131 }
00132 }
00133
00134 void AribaOmnetModule::handleMessage(cMessage* msg){
00135
00136 logging_debug( cModule::fullPath() << " handling message" );
00137 bool socketfound = false;
00138
00139 SocketMap::iterator i = sockets.begin();
00140 SocketMap::iterator iend = sockets.end();
00141
00142 for( ; i != iend; i++ ){
00143 if( i->second->belongsToSocket( msg )){
00144 i->second->processMessage( msg );
00145 socketfound = true;
00146
00147 logging_debug( cModule::fullPath() << " found socket for message" );
00148 break;
00149 }
00150 }
00151
00152 if( ! socketfound ) {
00153
00154 logging_debug( cModule::fullPath() << " creating new socket for message" );
00155
00156 TCPSocket* dispatch = new TCPSocket( msg );
00157 dispatch->setCallbackObject( this, dispatch );
00158 dispatch->setOutputGate(gate("tcpOut"));
00159
00160 ostringstream o;
00161 o << dispatch->remoteAddress().str() << ":" << dispatch->remotePort();
00162
00163 sockets.insert( make_pair(o.str(), dispatch) );
00164 dispatch->processMessage( msg );
00165 }
00166 }
00167
00168 void AribaOmnetModule::finish(){
00169 StartupWrapper::shutdown();
00170 }
00171
00172 seqnum_t AribaOmnetModule::sendMessage(const Message* message){
00173
00174 Enter_Method_Silent();
00175 logging_debug( cModule::fullPath() << " sending message" );
00176
00177
00178
00179
00180
00181 Data data = data_serialize( message );
00182 const_cast<Message*>(message)->dropPayload();
00183
00184 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
00185 if( address == NULL ) return 0;
00186
00187 size_t len = data.getLength()/8;
00188 uint8_t* buffer = data.getBuffer();
00189
00190 AribaOmnetMessage* outmsg = new AribaOmnetMessage( "AribaOmnetMessage");
00191 outmsg->setPort( serverPort );
00192 outmsg->setDataArraySize( len );
00193 outmsg->setByteLength( len );
00194
00195 for( size_t i=0; i<len; i++, buffer++)
00196 outmsg->setData(i, *buffer);
00197
00198
00199
00200
00201
00202 SocketMap::iterator i = sockets.find( address->toString() );
00203 TCPSocket* connectionSocket = NULL;
00204
00205 if( i == sockets.end() ){
00206
00207 logging_debug( cModule::fullPath() <<
00208 " creating new socket, connecting and queueing message" );
00209
00210
00211
00212
00213 SocketMap::iterator ret = sockets.insert(
00214 make_pair(address->toString(), new TCPSocket()) );
00215 connectionSocket = ret->second;
00216
00217 connectionSocket->setCallbackObject( this, connectionSocket );
00218 connectionSocket->setOutputGate(gate("tcpOut"));
00219
00220 pendingSends.insert( make_pair(connectionSocket, outmsg) );
00221 connectionSocket->connect( address->getIP().c_str(), address->getPort() );
00222
00223 } else {
00224
00225 logging_debug( cModule::fullPath() << " found socket, just sending out message" );
00226 connectionSocket = i->second;
00227 connectionSocket->send( outmsg );
00228 }
00229
00230
00231
00232
00233
00234 data.release();
00235 return 0;
00236 }
00237
00238 void AribaOmnetModule::socketDataArrived(int connId, void* socket, cMessage* msg, bool urgent){
00239
00240 TCPSocket* tcpsocket = (TCPSocket*)socket;
00241
00242 AribaOmnetMessage* encap = dynamic_cast<AribaOmnetMessage*>(msg);
00243 if( encap == NULL ) return;
00244
00245 logging_debug( cModule::fullPath() << " socket data arrived " << msg->info() );
00246
00247 size_t len = encap->getDataArraySize();
00248 Data data( len*8 );
00249 uint8_t* pnt = data.getBuffer();
00250
00251 for( size_t i=0; i<len; i++, pnt++)
00252 *pnt = encap->getData( i );
00253
00254 Message* spovnetmsg = new Message(data);
00255
00256 ostringstream o;
00257 o << tcpsocket->remoteAddress().str() << ":" << encap->getPort();
00258 spovnetmsg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
00259
00260 logging_debug( cModule::fullPath() << " forwarding to base communication" );
00261 MessageProvider::sendMessageToReceivers( spovnetmsg );
00262
00263 delete encap;
00264 }
00265
00266 void AribaOmnetModule::socketFailure(int connId, void* socket, int code){
00267 logging_warn( cModule::fullPath() << " socket failure " << code );
00268 }
00269
00270 void AribaOmnetModule::socketClosed(int connId, void* socket){
00271 logging_debug( cModule::fullPath() << " socket closed" );
00272 }
00273
00274 void AribaOmnetModule::socketPeerClosed(int connId, void* socket){
00275
00276 logging_debug( cModule::fullPath() << " socket peer closed" );
00277 TCPSocket* tcpsocket = (TCPSocket*)socket;
00278
00279 SocketMap::iterator i = sockets.begin();
00280 SocketMap::iterator iend = sockets.end();
00281
00282 for( ; i != iend; i++ ){
00283
00284 if( i->second == tcpsocket ){
00285 sockets.erase( i );
00286 delete tcpsocket;
00287 break;
00288 }
00289 }
00290 }
00291
00292 void AribaOmnetModule::socketEstablished(int connId, void* socket){
00293
00294 logging_debug( cModule::fullPath() << " socket established" );
00295
00296 TCPSocket* tcpsocket = (TCPSocket*)socket;
00297 assert( tcpsocket != NULL );
00298
00299
00300
00301
00302
00303 PendingSendQueue::iterator i = pendingSends.find( tcpsocket );
00304 if( i != pendingSends.end() ){
00305
00306 logging_debug( cModule::fullPath() << " socket established ... scheduling send msg" );
00307
00308 tcpsocket->send( i->second );
00309 pendingSends.erase( i );
00310
00311 } else {
00312 logging_debug( cModule::fullPath() << " dispatch socket established ... server side" );
00313 }
00314 }
00315
00316 void AribaOmnetModule::socketStatusArrived(int connId, void* socket, TCPStatusInfo *status){
00317 logging_debug( cModule::fullPath() << " socket status arrivede" );
00318 }
00319
00320 }}