00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "BaseCommunication.h"
00040
00041 #include "networkinfo/AddressDiscovery.h"
00042 #include "ariba/utility/types/PeerID.h"
00043
00044 #ifdef UNDERLAY_OMNET
00045 #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
00046 #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
00047 #include "ariba/utility/system/StartupWrapper.h"
00048
00049 using ariba::communication::AribaOmnetModule;
00050 using ariba::communication::OmnetNetworkProtocol;
00051 using ariba::utility::StartupWrapper;
00052 #endif
00053
00054 namespace ariba {
00055 namespace communication {
00056
00057 using ariba::utility::PeerID;
00058
00059 use_logging_cpp(BaseCommunication);
00060
00062 void BaseCommunication::add_endpoint( const address_v* endpoint ) {
00063 if (endpoint==NULL) return;
00064 BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {
00065 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {
00066 ref.count++;
00067 return;
00068 }
00069 }
00070 endpoint_reference ref;
00071 ref.endpoint = endpoint->clone();
00072 ref.count = 1;
00073 remote_endpoints.push_back(ref);
00074 }
00075
00077 void BaseCommunication::remove_endpoint( const address_v* endpoint ) {
00078 if (endpoint==NULL) return;
00079 for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();
00080 i != remote_endpoints.end(); i++) {
00081 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {
00082 i->count--;
00083 if (i->count==0) {
00084 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");
00085 transport->terminate(i->endpoint);
00086 delete i->endpoint;
00087 remote_endpoints.erase(i);
00088 }
00089 return;
00090 }
00091 }
00092 }
00093
00094 BaseCommunication::BaseCommunication() {
00095 this->transport = NULL;
00096 this->started = false;
00097 }
00098
00099 BaseCommunication::~BaseCommunication(){
00100 }
00101
00102 void BaseCommunication::start() {
00103 logging_info( "Starting up ..." );
00104 currentSeqnum = 0;
00105
00106
00107 localDescriptor.getPeerId() = PeerID::random();
00108 logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
00109
00110
00111 logging_info( "Creating transports ..." );
00112
00113 #ifdef UNDERLAY_OMNET
00114 AribaOmnetModule* module = StartupWrapper::getCurrentModule();
00115 module->setServerPort( listenport );
00116
00117 transport = module;
00118 network = new OmnetNetworkProtocol( module );
00119 #else
00120 transport = new transport_peer( localDescriptor.getEndpoints() );
00121 #endif
00122
00123 logging_info( "Searching for local locators ..." );
00128 AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() );
00129 logging_info( "Done. Local endpoints = " << localDescriptor.toString() );
00130
00131 transport->register_listener( this );
00132 transport->start();
00133
00134 #ifndef UNDERLAY_OMNET
00135
00136 networkMonitor.registerNotification( this );
00137 #endif
00138
00139
00140 started = true;
00141 logging_info( "Started up." );
00142 }
00143
00144 void BaseCommunication::stop() {
00145 logging_info( "Stopping transports ..." );
00146
00147 transport->stop();
00148 delete transport;
00149 started = false;
00150
00151 logging_info( "Stopped." );
00152 }
00153
00154 bool BaseCommunication::isStarted(){
00155 return started;
00156 }
00157
00159 void BaseCommunication::setEndpoints( string& _endpoints ) {
00160 localDescriptor.getEndpoints().assign(_endpoints);
00161 logging_info("Setting local end-points: "
00162 << localDescriptor.getEndpoints().to_string());
00163 }
00164
00165 const LinkID BaseCommunication::establishLink(
00166 const EndpointDescriptor& descriptor,
00167 const LinkID& link_id,
00168 const QoSParameterSet& qos,
00169 const SecurityParameterSet& sec) {
00170
00171
00172 LinkID linkid = link_id;
00173
00174
00175 logging_debug( "Request to establish link" );
00176
00177
00178 if (linkid.isUnspecified()) linkid = LinkID::create();
00179
00180
00181 logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() );
00182 LinkDescriptor* ld = new LinkDescriptor();
00183 ld->localLink = linkid;
00184 addLink( ld );
00185
00186
00187 logging_debug( "Send messages with request to open link to " << descriptor.toString() );
00188 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid );
00189 baseMsg.getLocalDescriptor() = localDescriptor;
00190 baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId();
00191
00192
00193 send( &baseMsg, descriptor );
00194
00195 return linkid;
00196 }
00197
00198 void BaseCommunication::dropLink(const LinkID link) {
00199
00200 logging_debug( "Starting to drop link " + link.toString() );
00201
00202
00203 LinkDescriptor& ld = queryLocalLink( link );
00204 if( ld.isUnspecified() ) {
00205 logging_error( "Don't know the link you want to drop "+ link.toString() );
00206 return;
00207 }
00208
00209
00210 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
00211 i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
00212 }
00213
00214
00215 logging_debug( "Sending out link close request. for us, the link is closed now" );
00216 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink );
00217
00218
00219 send( &msg, ld );
00220
00221
00222 removeLink(link);
00223 }
00224
00225 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
00226
00227 logging_debug( "Sending out message to link " << lid.toString() );
00228
00229
00230 LinkDescriptor& ld = queryLocalLink(lid);
00231 if( ld.isUnspecified() ){
00232 logging_error( "Don't know the link with id " << lid.toString() );
00233 return -1;
00234 }
00235
00236
00237 if( !ld.up ) {
00238 logging_error("Can not send on link " << lid.toString() << ": link not up");
00239 return -1;
00240 }
00241
00242
00243 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink );
00244
00245
00246 msg.encapsulate( const_cast<Message*>(message) );
00247
00248
00249 send( &msg, ld );
00250
00251
00252 return ++currentSeqnum;
00253 }
00254
00255 const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
00256 if( link.isUnspecified() ){
00257 return localDescriptor;
00258 } else {
00259 LinkDescriptor& linkDesc = queryLocalLink(link);
00260 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
00261 return linkDesc.remoteEndpoint;
00262 }
00263 }
00264
00265 void BaseCommunication::registerEventListener(CommunicationEvents* _events){
00266 if( eventListener.find( _events ) == eventListener.end() )
00267 eventListener.insert( _events );
00268 }
00269
00270 void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
00271 EventListenerSet::iterator i = eventListener.find( _events );
00272 if( i != eventListener.end() )
00273 eventListener.erase( i );
00274 }
00275
00276 SystemEventType TransportEvent("Transport");
00277 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
00278
00279 class DispatchMsg {
00280 public:
00281 DispatchMsg() : local(NULL), remote(NULL), message(NULL) {}
00282 address_v* local;
00283 address_v* remote;
00284 Message* message;
00285 };
00286
00288 void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
00289
00290
00291 if ( event.getType() == MessageDispatchEvent ){
00292 logging_debug( "Forwarding message receiver" );
00293 DispatchMsg* dmsg = event.getData<DispatchMsg>();
00294 Message* msg = dmsg->message;
00295 receiveMessage(msg, dmsg->local, dmsg->remote);
00296 msg->dropPayload();
00297 delete dmsg->local;
00298 delete dmsg->remote;
00299 delete msg;
00300 delete dmsg;
00301 }
00302 }
00303
00305 void BaseCommunication::receive_message(transport_protocol* transport,
00306 const address_vf local, const address_vf remote, const uint8_t* data,
00307 size_t size) {
00308
00309
00310
00311
00312 Data data_( const_cast<uint8_t*>(data), size * 8 );
00313 DispatchMsg* dmsg = new DispatchMsg();
00314
00315 Message* msg = new Message(data_);
00316 dmsg->local = local->clone();
00317 dmsg->remote = remote->clone();
00318 dmsg->message = msg;
00319
00320 SystemQueue::instance().scheduleEvent(
00321 SystemEvent( this, MessageDispatchEvent, dmsg )
00322 );
00323 }
00324
00326 void BaseCommunication::receiveMessage(const Message* message,
00327 const address_v* local, const address_v* remote ){
00328
00330 AribaBaseMsg* msg = ((Message*)message)->decapsulate<AribaBaseMsg>();
00331 logging_debug( "Receiving message of type " << msg->getTypeString() );
00332
00333
00334 switch (msg->getType()) {
00335
00336
00337
00338
00339 case AribaBaseMsg::typeData: {
00340 logging_debug( "Received data message, forwarding to overlay" );
00341 if( messageReceiver != NULL ) {
00342 messageReceiver->receiveMessage(
00343 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED
00344 );
00345 }
00346 break;
00347 }
00348
00349
00350
00351
00352 case AribaBaseMsg::typeLinkRequest: {
00353 logging_debug( "Received link open request" );
00354
00356 if (!msg->getRemoteDescriptor().getPeerId().isUnspecified()
00357 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) {
00358 logging_info("Received link request for "
00359 << msg->getRemoteDescriptor().getPeerId().toString()
00360 << "but i'm "
00361 << localDescriptor.getPeerId()
00362 << ": Ignoring!");
00363 break;
00364 }
00365
00367 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) {
00368 logging_debug("Link request already received. Ignore!");
00369 break;
00370 }
00371
00373 LinkID localLink = LinkID::create();
00374 LinkID remoteLink = msg->getLocalLink();
00375 logging_debug( "local=" << local->to_string()
00376 << " remote=" << remote->to_string()
00377 );
00378
00379
00380 bool allowlink = true;
00381 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00382 allowlink &= i->onLinkRequest( localLink, local, remote );
00383 }
00384
00385
00386 if( !allowlink ){
00387 logging_warn( "Overlay denied creation of link" );
00388 delete msg;
00389 return;
00390 }
00391
00392
00393 LinkDescriptor* ld = new LinkDescriptor();
00394 ld->localLink = localLink;
00395 ld->remoteLink = remoteLink;
00396 ld->localLocator = local->clone();
00397 ld->remoteLocator = remote->clone();
00398 ld->remoteEndpoint = msg->getLocalDescriptor();
00399 add_endpoint(ld->remoteLocator);
00400
00401
00402 ld->remoteEndpoint.getEndpoints().add(
00403 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
00404 localDescriptor.getEndpoints().add(
00405 local, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
00406
00407
00408 ld->up = true;
00409 addLink(ld);
00410
00411
00412 logging_debug( "Link (initiated from remote) is up with "
00413 << "local(id=" << ld->localLink.toString() << ","
00414 << "locator=" << ld->localLocator->to_string() << ") "
00415 << "remote(id=" << ld->remoteLink.toString() << ", "
00416 << "locator=" << ld->remoteLocator->to_string() << ")"
00417 );
00418
00419
00420 logging_debug( "Sending link request reply with ids "
00421 << "local=" << localLink.toString() << ", "
00422 << "remote=" << remoteLink.toString() );
00423 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink );
00424 reply.getLocalDescriptor() = localDescriptor;
00425 reply.getRemoteDescriptor() = ld->remoteEndpoint;
00426
00427 send( &reply, *ld );
00428
00429
00430 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
00431 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
00432 }
00433
00434
00435 break;
00436 }
00437
00438
00439
00440
00441 case AribaBaseMsg::typeLinkReply: {
00442 logging_debug( "Received link open reply for a link we initiated" );
00443
00444
00445
00446 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink() );
00447
00448
00449 if (ld.isUnspecified()) {
00450 logging_warn("Failed to find local link " << msg->getRemoteLink().toString());
00451 delete msg;
00452 return;
00453 }
00454
00455
00456 ld.remoteLink = msg->getLocalLink();
00457 ld.remoteLocator = remote->clone();
00458 ld.remoteEndpoint.getEndpoints().add(
00459 msg->getLocalDescriptor().getEndpoints(),
00460 endpoint_set::Layer1_4
00461 );
00462
00463 localDescriptor.getEndpoints().add(
00464 msg->getRemoteDescriptor().getEndpoints(),
00465 endpoint_set::Layer1_3
00466 );
00467 ld.up = true;
00468 add_endpoint(ld.remoteLocator);
00469
00470 logging_debug( "Link is now up with local id "
00471 << ld.localLink.toString() << " and remote id "
00472 << ld.remoteLink.toString() );
00473
00474
00475
00476 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00477 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
00478 }
00479
00480
00481 break;
00482 }
00483
00484
00485
00486
00487 case AribaBaseMsg::typeLinkClose: {
00488
00489 const LinkID& localLink = msg->getRemoteLink();
00490 logging_debug( "Received link close request for link " << localLink.toString() );
00491
00492
00493 LinkDescriptor& linkDesc = queryLocalLink( localLink );
00494 if (linkDesc.isUnspecified()) {
00495 logging_warn("Failed to find local link " << localLink.toString());
00496 delete msg;
00497 return;
00498 }
00499
00500
00501 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00502 i->onLinkDown( linkDesc.localLink,
00503 linkDesc.localLocator, linkDesc.remoteLocator );
00504 }
00505
00506
00507 removeLink( localLink );
00508
00509
00510 break;
00511 }
00512
00513
00514
00515
00516 case AribaBaseMsg::typeLinkUpdate: {
00517 const LinkID& localLink = msg->getRemoteLink();
00518 logging_debug( "Received link update for link "
00519 << localLink.toString() );
00520
00521
00522 LinkDescriptor& linkDesc = queryLocalLink( localLink );
00523 if (linkDesc.isUnspecified()) {
00524 logging_warn("Failed to update local link "
00525 << localLink.toString());
00526 delete msg;
00527 return;
00528 }
00529
00530
00531 const address_v* oldremote = linkDesc.remoteLocator;
00532 linkDesc.remoteLocator = remote->clone();
00533
00534
00535 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00536 i->onLinkChanged(
00537 linkDesc.localLink,
00538 linkDesc.localLocator,
00539 linkDesc.localLocator,
00540 oldremote,
00541 linkDesc.remoteLocator
00542 );
00543 }
00544
00545
00546 break;
00547 }
00548 }
00549
00550 delete msg;
00551 }
00552
00554 void BaseCommunication::addLink( LinkDescriptor* link ) {
00555 linkSet.push_back( link );
00556 }
00557
00559 void BaseCommunication::removeLink( const LinkID& localLink ) {
00560 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
00561 if( (*i)->localLink != localLink) continue;
00562 remove_endpoint((*i)->remoteLocator);
00563 delete *i;
00564 linkSet.erase( i );
00565 break;
00566 }
00567 }
00568
00570 BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
00571 for (size_t i=0; i<linkSet.size();i++)
00572 if (linkSet[i]->localLink == link) return (LinkDescriptor&)*linkSet[i];
00573
00574 return LinkDescriptor::UNSPECIFIED();
00575 }
00576
00578 BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
00579 for (size_t i=0; i<linkSet.size();i++)
00580 if (linkSet[i]->remoteLink == link) return (LinkDescriptor&)*linkSet[i];
00581
00582 return LinkDescriptor::UNSPECIFIED();
00583 }
00584
00585 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
00586 LinkIDs ids;
00587 for (size_t i=0; i<linkSet.size(); i++){
00588 if( addr == NULL ){
00589 ids.push_back( linkSet[i]->localLink );
00590 } else {
00591 if ( *linkSet[i]->remoteLocator == *addr )
00592 ids.push_back( linkSet[i]->localLink );
00593 }
00594 }
00595 return ids;
00596 }
00597
00598 void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
00599
00600 #ifdef UNDERLAY_OMNET
00601
00602
00603 return
00604
00605 #endif // UNDERLAY_OMNET
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739
00740 }
00741
00743 void BaseCommunication::send(Message* message, const EndpointDescriptor& endpoint) {
00744 Data data = data_serialize( message, DEFAULT_V );
00745 transport->send( endpoint.getEndpoints(), data.getBuffer(), data.getLength() / 8);
00746 data.release();
00747 }
00748
00750 void BaseCommunication::send(Message* message, const LinkDescriptor& desc) {
00751 if (desc.remoteLocator==NULL) return;
00752 Data data = data_serialize( message, DEFAULT_V );
00753 transport->send( desc.remoteLocator, data.getBuffer(), data.getLength() / 8);
00754 data.release();
00755 }
00756
00757 }}