00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "BaseCommunication.h"
00040
00041 #ifdef UNDERLAY_OMNET
00042 #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
00043 #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
00044 #include "ariba/utility/system/StartupWrapper.h"
00045
00046 using ariba::communication::AribaOmnetModule;
00047 using ariba::communication::OmnetNetworkProtocol;
00048 using ariba::utility::StartupWrapper;
00049 #endif
00050
00051 namespace ariba {
00052 namespace communication {
00053
00054 use_logging_cpp(BaseCommunication);
00055 const BaseCommunication::LinkDescriptor BaseCommunication::LinkDescriptor::UNSPECIFIED;
00056
00057 BaseCommunication::BaseCommunication()
00058 : messageReceiver(NULL), network(NULL), transport(NULL){
00059 }
00060
00061 BaseCommunication::~BaseCommunication(){
00062 }
00063
00064 void BaseCommunication::start(const NetworkLocator* _locallocator, const uint16_t _listenport){
00065
00066 currentSeqnum = 0;
00067 listenport = _listenport;
00068
00069 logging_info( "starting up base communication and creating transports ..." );
00070 logging_info( "using port " << listenport );
00071
00072 #ifdef UNDERLAY_OMNET
00073 AribaOmnetModule* module = StartupWrapper::getCurrentModule();
00074 module->setServerPort( listenport );
00075
00076 transport = module;
00077 network = new OmnetNetworkProtocol( module );
00078 #else
00079 transport = new TCPTransport( listenport );
00080 network = new IPv4NetworkProtocol();
00081 #endif
00082
00083 logging_debug( "searching for local locators ..." );
00084
00085 NetworkProtocol::NetworkLocatorSet locators = network->getAddresses();
00086 NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin();
00087 NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end();
00088
00089
00090
00091
00092
00093 bool foundLocator = false;
00094
00095 for( ; i != iend; i++){
00096 logging_debug( "local locator found " << (*i)->toString() );
00097 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
00098
00099
00100
00101
00102
00103
00104
00105 if( *ipv4locator != IPv4Locator::LOCALHOST &&
00106 *ipv4locator != IPv4Locator::ANY &&
00107 *ipv4locator != IPv4Locator::BROADCAST ){
00108
00109 ipv4locator->setPort(listenport);
00110 localDescriptor.locator = ipv4locator;
00111 localDescriptor.isUnspec = false;
00112 logging_info( "binding to addr = " << ipv4locator->toString() );
00113 foundLocator = true;
00114 break;
00115 }
00116 }
00117
00118
00119 if( _locallocator != NULL ) {
00120 if( localDescriptor.locator != NULL) delete localDescriptor.locator;
00121 localDescriptor.locator = new IPv4Locator( IPv4Locator::fromString( _locallocator->toString()) );
00122 localDescriptor.isUnspec = false;
00123 logging_debug( "manual locator override, using locator=" <<
00124 localDescriptor.locator->toString() );
00125 foundLocator = true;
00126 }
00127
00128
00129 if( !foundLocator )
00130 logging_fatal( "did not find a useable local locator!" );
00131
00132 transport->addMessageReceiver( this );
00133 transport->start();
00134
00135 #ifndef UNDERLAY_OMNET
00136
00137
00138
00139
00140 networkMonitor.registerNotification( this );
00141 #endif
00142
00143
00144
00145
00146
00147 logging_info( "base communication started up" );
00148 }
00149
00150 void BaseCommunication::stop() {
00151
00152 logging_info( "stopping base communication and transport ..." );
00153
00154 transport->stop();
00155 delete transport;
00156 delete network;
00157
00158 logging_info( "base communication stopped" );
00159 }
00160
00161 const LinkID BaseCommunication::establishLink(
00162 const EndpointDescriptor& descriptor,
00163 const LinkID& link_id,
00164 const QoSParameterSet& qos,
00165 const SecurityParameterSet& sec) {
00166
00167
00168 LinkID linkid = link_id;
00169
00170
00171 logging_debug( "request to establish link" );
00172
00173
00174
00175
00176 if( descriptor.locator == NULL ){
00177 logging_error( "invalid destination endpoint" );
00178 return LinkID::UNSPECIFIED;
00179 }
00180
00181 if( localDescriptor.locator == NULL ){
00182 logging_error( "invalid local endpoint" );
00183 return LinkID::UNSPECIFIED;
00184 }
00185
00186 const NetworkLocator* remote = descriptor.locator;
00187 const NetworkLocator* local = localDescriptor.locator;
00188
00189
00190 if (linkid.isUnspecified()) linkid = LinkID::create();
00191 logging_debug( "creating new local descriptor entry with local link id " << linkid.toString() );
00192 LinkDescriptor linkDescriptor( linkid, local, LinkID::UNSPECIFIED, remote, descriptor, false );
00193 addLink( linkDescriptor );
00194
00195
00196
00197
00198
00199
00200 logging_debug( "sending out base messages with request to open link to " << remote->toString() );
00201 AribaBaseMsg baseMsg( remote, AribaBaseMsg::LINK_STATE_OPEN_REQUEST, linkid,
00202 LinkID::UNSPECIFIED );
00203 transport->sendMessage(&baseMsg);
00204
00205 return linkid;
00206 }
00207
00208 void BaseCommunication::dropLink(const LinkID link) {
00209
00210 logging_debug( "starting to drop link " + link.toString() );
00211
00212
00213 LinkDescriptor& descriptor = queryLocalLink( link );
00214 if( descriptor.isUnspecified() ){
00215 logging_error( "don't know the link you want to drop "+ link.toString() );
00216 return;
00217 }
00218
00219
00220 logging_debug( "sending out link close request. for us, the link is closed now" );
00221 AribaBaseMsg msg(
00222 descriptor.remoteLocator,
00223 AribaBaseMsg::LINK_STATE_CLOSE_REQUEST,
00224 descriptor.localLink,
00225 descriptor.remoteLink
00226 );
00227
00228
00229 transport->sendMessage( &msg );
00230
00231
00232 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00233 i->onLinkDown( link, descriptor.localLocator, descriptor.remoteLocator );
00234 }
00235
00236
00237 removeLink(link);
00238 }
00239
00240 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
00241
00242 logging_debug( "sending out message to link " << lid.toString() );
00243
00244
00245 LinkDescriptor& linkDesc = queryLocalLink(lid);
00246 if( linkDesc.isUnspecified() ){
00247 logging_error( "don't know the link with id " << lid.toString() );
00248 return -1;
00249 }
00250
00251
00252 AribaBaseMsg msg(
00253 linkDesc.remoteLocator,
00254 AribaBaseMsg::LINK_STATE_DATA,
00255 linkDesc.localLink,
00256 linkDesc.remoteLink
00257 );
00258
00259
00260 msg.encapsulate( const_cast<Message*>(message) );
00261
00262 if( !linkDesc.linkup ){
00263 logging_error("cant send message on link " << lid.toString() << ", link not up");
00264 return -1;
00265 }
00266
00267
00268 transport->sendMessage( &msg );
00269 return ++currentSeqnum;
00270 }
00271
00272 const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
00273
00274 if( link == LinkID::UNSPECIFIED){
00275 return localDescriptor;
00276 } else {
00277 LinkDescriptor& linkDesc = queryLocalLink(link);
00278 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED;
00279 return linkDesc.remoteEndpoint;
00280 }
00281 }
00282
00283 void BaseCommunication::registerMessageReceiver(MessageReceiver* _receiver) {
00284 messageReceiver = _receiver;
00285 }
00286
00287 void BaseCommunication::unregisterMessageReceiver(MessageReceiver* _receiver) {
00288 messageReceiver = NULL;
00289 }
00290
00291 void BaseCommunication::registerEventListener(CommunicationEvents* _events){
00292
00293 if( eventListener.find( _events ) == eventListener.end() )
00294 eventListener.insert( _events );
00295 }
00296
00297 void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
00298
00299 EventListenerSet::iterator i = eventListener.find( _events );
00300 if( i != eventListener.end() )
00301 eventListener.erase( i );
00302 }
00303
00304
00305 bool BaseCommunication::receiveMessage(const Message* message, const LinkID& link, const NodeID& node){
00306
00307
00308
00309
00310
00311
00312
00313
00314 AribaBaseMsg* spovmsg = ((Message*)message)->decapsulate<AribaBaseMsg>();
00315 logging_debug( "receiving base comm message of type " << spovmsg->getTypeString() );
00316
00317
00318
00319
00320
00321
00322 if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_DATA ){
00323
00324 logging_debug( "received data message, forwarding to overlay" );
00325
00326
00327
00328
00329
00330
00331 if( messageReceiver != NULL ) {
00332 messageReceiver->receiveMessage(
00333 spovmsg,
00334 spovmsg->getRemoteLink(),
00335 NodeID::UNSPECIFIED
00336 );
00337 }
00338
00339 }
00340
00341
00342
00343
00344
00345 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_OPEN_REQUEST ){
00346
00347 logging_debug( "received link open request" );
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357 logging_debug( "creating local link" );
00358
00359 LinkID localLink = LinkID::create();
00360 LinkID remoteLink = spovmsg->getLocalLink();
00361
00362
00363 const NetworkLocator* localLocator = dynamic_cast<const NetworkLocator*>(localDescriptor.locator);
00364 const NetworkLocator* remoteLocator = dynamic_cast<const NetworkLocator*>(message->getSourceAddress());
00365
00366 logging_debug( "localLocator=" << localLocator->toString()
00367 << " remoteLocator=" << remoteLocator->toString());
00368
00369
00370
00371
00372
00373 bool allowlink = true;
00374 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00375 allowlink &= i->onLinkRequest( localLink, localLocator, remoteLocator );
00376 }
00377
00378 if( !allowlink ){
00379 logging_warn( "overlay denied creation of link" );
00380 return true;
00381 }
00382
00383
00384
00385
00386
00387 LinkDescriptor linkDescriptor(localLink, localLocator, remoteLink,
00388 remoteLocator, EndpointDescriptor(remoteLocator), true);
00389
00390 logging_debug( "saving new link descriptor with " <<
00391 "[local link " << localLink.toString() << "] " <<
00392 "[local locator " << localLocator->toString() << "] " <<
00393 "[remote link " << remoteLink.toString() << "] " <<
00394 "[remote locator " << remoteLocator->toString() << "]" <<
00395 "[link up true]" );
00396
00397 addLink( linkDescriptor );
00398
00399
00400
00401
00402
00403 logging_debug( "sending back link open reply for " <<
00404 "[local link " << localLink.toString() << "] " <<
00405 "[remote link " << remoteLink.toString() << "]" );
00406
00407 AribaBaseMsg reply(remoteLocator,
00408 AribaBaseMsg::LINK_STATE_OPEN_REPLY,
00409 localLink,
00410 remoteLink);
00411
00412 transport->sendMessage( &reply );
00413
00414
00415
00416
00417
00418 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00419 i->onLinkUp( localLink, localLocator, remoteLocator );
00420 }
00421
00422 }
00423
00424
00425
00426
00427
00428 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_OPEN_REPLY ){
00429
00430 logging_debug( "received link open reply for a link we initiated" );
00431
00432
00433
00434 LinkDescriptor& linkDesc = queryLocalLink( spovmsg->getRemoteLink() );
00435
00436 if (linkDesc.isUnspecified()) {
00437 logging_warn("Failed to find local link " << spovmsg->getRemoteLink().toString());
00438 return false;
00439 }
00440
00441 linkDesc.remoteLink = spovmsg->getLocalLink();
00442 linkDesc.linkup = true;
00443
00444 logging_debug( "the link is now up with local link id " << spovmsg->getRemoteLink().toString() );
00445
00446
00447
00448
00449
00450
00451 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00452 i->onLinkUp( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator );
00453 }
00454
00455 }
00456
00457
00458
00459
00460
00461 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_CLOSE_REQUEST ){
00462
00463 const LinkID& localLink = spovmsg->getRemoteLink();
00464 logging_debug( "received link close request for link " << localLink.toString() );
00465
00466
00467
00468
00469
00470
00471
00472 LinkDescriptor& linkDesc = queryLocalLink( localLink );
00473 if (linkDesc.isUnspecified()) {
00474 logging_warn("Failed to find local link " << localLink.toString());
00475 return false;
00476 }
00477
00478 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00479 i->onLinkDown( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator );
00480 }
00481
00482
00483
00484
00485
00486 removeLink( localLink );
00487
00488 }
00489
00490
00491
00492
00493
00494 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_UPDATE ){
00495
00496 const LinkID& localLink = spovmsg->getRemoteLink();
00497 logging_debug( "received link update for link " << localLink.toString() );
00498
00499
00500
00501
00502
00503 LinkDescriptor& linkDesc = queryLocalLink( localLink );
00504 if (linkDesc.isUnspecified()) {
00505 logging_warn("Failed to update local link " << localLink.toString());
00506 return false;
00507 }
00508
00509
00510
00511
00512
00513 const NetworkLocator* oldremote = linkDesc.remoteLocator;
00514 linkDesc.remoteLocator = dynamic_cast<const NetworkLocator*>(message->getSourceAddress());
00515
00516
00517
00518
00519
00520 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00521 i->onLinkChanged(
00522 linkDesc.localLink,
00523 linkDesc.localLocator,
00524 linkDesc.localLocator,
00525 oldremote,
00526 linkDesc.remoteLocator
00527 );
00528 }
00529
00530 }
00531
00532 return true;
00533 }
00534
00535 void BaseCommunication::addLink( const LinkDescriptor& link ) {
00536 linkSet.push_back( link );
00537 }
00538
00539 void BaseCommunication::removeLink( const LinkID& localLink ) {
00540
00541 LinkSet::iterator i = linkSet.begin();
00542 LinkSet::iterator iend = linkSet.end();
00543
00544 for( ; i != iend; i++){
00545 if( (*i).localLink != localLink) continue;
00546
00547 linkSet.erase( i );
00548 break;
00549 }
00550 }
00551
00552 BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
00553 for (int i=0; i<linkSet.size();i++)
00554 if (linkSet[i].localLink == link) return (LinkDescriptor&)linkSet[i];
00555 return (LinkDescriptor&)LinkDescriptor::UNSPECIFIED;
00556 }
00557
00558 BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
00559 for (int i=0; i<linkSet.size();i++)
00560 if (linkSet[i].remoteLink == link) return (LinkDescriptor&)linkSet[i];
00561 return (LinkDescriptor&)LinkDescriptor::UNSPECIFIED;
00562 }
00563
00564 LinkIDs BaseCommunication::getLocalLinks( const EndpointDescriptor& ep ) const {
00565 LinkIDs ids;
00566
00567 for (int i=0; i<linkSet.size(); i++){
00568 if( ep == EndpointDescriptor::UNSPECIFIED ){
00569 ids.push_back( linkSet[i].localLink );
00570 } else {
00571 if ( linkSet[i].remoteLocator == ep.locator )
00572 ids.push_back( linkSet[i].localLink );
00573 }
00574 }
00575
00576 return ids;
00577 }
00578
00579 void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
00580
00581 #ifdef UNDERLAY_OMNET
00582
00583
00584 return
00585
00586 #endif // UNDERLAY_OMNET
00587
00588
00589
00590
00591
00592
00593 if( info.type != NetworkChangeInterface::EventTypeAddressNew &&
00594 info.type != NetworkChangeInterface::EventTypeAddressDelete ) return;
00595
00596 logging_info( "base communication is handling network address changes" );
00597
00598
00599
00600
00601
00602 NetworkInformation networkInformation;
00603 AddressInformation addressInformation;
00604
00605 NetworkInterfaceList interfaces = networkInformation.getInterfaces();
00606 AddressList addresses;
00607
00608 for( NetworkInterfaceList::iterator i = interfaces.begin(); i != interfaces.end(); i++ ){
00609 AddressList newaddr = addressInformation.getAddresses(*i);
00610 addresses.insert( addresses.end(), newaddr.begin(), newaddr.end() );
00611 }
00612
00613
00614
00615
00616
00617
00618 NetworkProtocol::NetworkLocatorSet locators = network->getAddresses();
00619 NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin();
00620 NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end();
00621
00622
00623
00624
00625
00626 EndpointDescriptor oldLocalDescriptor( localDescriptor );
00627
00628
00629
00630
00631
00632
00633
00634 bool foundLocator = false;
00635 bool changedLocator = false;
00636
00637 for( ; i != iend; i++){
00638 logging_debug( "local locator found " << (*i)->toString() );
00639 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
00640
00641 if( *ipv4locator != IPv4Locator::LOCALHOST &&
00642 *ipv4locator != IPv4Locator::ANY &&
00643 *ipv4locator != IPv4Locator::BROADCAST ){
00644
00645 ipv4locator->setPort( listenport );
00646 changedLocator = *localDescriptor.locator != *ipv4locator;
00647 localDescriptor.locator = ipv4locator;
00648 logging_info( "binding to addr = " << ipv4locator->toString() );
00649 foundLocator = true;
00650 break;
00651 }
00652 }
00653
00654
00655
00656
00657
00658 if( !foundLocator ){
00659 changedLocator = *localDescriptor.locator != IPv4Locator::LOCALHOST;
00660 localDescriptor.locator = new IPv4Locator( IPv4Locator::LOCALHOST );
00661 ((IPv4Locator*)(localDescriptor.locator))->setPort( listenport );
00662 logging_info( "found no good local lcoator, binding to addr = " <<
00663 localDescriptor.locator->toString() );
00664 }
00665
00666
00667
00668
00669
00670
00671
00672 if( changedLocator ){
00673
00674 logging_debug( "local endp locator has changed to " << localDescriptor.toString() <<
00675 ", resettings connections that end at old locator " <<
00676 oldLocalDescriptor.toString());
00677
00678 LinkSet::iterator i = linkSet.begin();
00679 LinkSet::iterator iend = linkSet.end();
00680
00681 for( ; i != iend; i++ ){
00682
00683 logging_debug( "checking connection for locator change: " <<
00684 " local " << (*i).localLocator->toString() <<
00685 " old " << oldLocalDescriptor.locator->toString() );
00686
00687 if( *((*i).localLocator) == *(oldLocalDescriptor.locator) ){
00688
00689 logging_debug("terminating connection to " << (*i).remoteLocator->toString() );
00690 transport->terminate( oldLocalDescriptor.locator, (*i).remoteLocator );
00691
00692 (*i).localLocator = localDescriptor.locator;
00693 }
00694 }
00695
00696
00697 usleep( 500000 );
00698
00699 } else {
00700
00701 logging_debug( "locator has not changed, not resetting connections" );
00702
00703 }
00704
00705
00706
00707
00708
00709
00710
00711 LinkSet::iterator iAffected = linkSet.begin();
00712 LinkSet::iterator endAffected = linkSet.end();
00713
00714 for( ; iAffected != endAffected; iAffected++ ){
00715 LinkDescriptor descr = *iAffected;
00716 logging_debug( "sending out link locator update to " << descr.remoteLocator->toString() );
00717
00718 AribaBaseMsg updateMsg( descr.remoteLocator,
00719 AribaBaseMsg::LINK_STATE_UPDATE,
00720 descr.localLink, descr.remoteLink );
00721
00722 transport->sendMessage( &updateMsg );
00723 }
00724 }
00725
00726 }}