00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "BaseCommunication.h"
00040 #include "networkinfo/AddressDiscovery.h"
00041
00042 #ifdef UNDERLAY_OMNET
00043 #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
00044 #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
00045 #include "ariba/utility/system/StartupWrapper.h"
00046
00047 using ariba::communication::AribaOmnetModule;
00048 using ariba::communication::OmnetNetworkProtocol;
00049 using ariba::utility::StartupWrapper;
00050 #endif
00051
00052 namespace ariba {
00053 namespace communication {
00054
00055 use_logging_cpp(BaseCommunication);
00056
00058 void BaseCommunication::add_endpoint( const address_v* endpoint ) {
00059 if (endpoint==NULL) return;
00060 BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {
00061 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {
00062 ref.count++;
00063 return;
00064 }
00065 }
00066 endpoint_reference ref;
00067 ref.endpoint = endpoint->clone();
00068 ref.count = 1;
00069 remote_endpoints.push_back(ref);
00070 }
00071
00073 void BaseCommunication::remove_endpoint( const address_v* endpoint ) {
00074 if (endpoint==NULL) return;
00075 for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();
00076 i != remote_endpoints.end(); i++) {
00077 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {
00078 i->count--;
00079 if (i->count==0) {
00080 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");
00081 transport->terminate(i->endpoint);
00082 delete i->endpoint;
00083 remote_endpoints.erase(i);
00084 }
00085 return;
00086 }
00087 }
00088 }
00089
00090 BaseCommunication::BaseCommunication() {
00091 this->transport = NULL;
00092 this->started = false;
00093 }
00094
00095 BaseCommunication::~BaseCommunication(){
00096 }
00097
00098 void BaseCommunication::start() {
00099 logging_info( "Starting up ..." );
00100 currentSeqnum = 0;
00101
00102
00103 logging_info( "Creating transports ..." );
00104
00105 #ifdef UNDERLAY_OMNET
00106 AribaOmnetModule* module = StartupWrapper::getCurrentModule();
00107 module->setServerPort( listenport );
00108
00109 transport = module;
00110 network = new OmnetNetworkProtocol( module );
00111 #else
00112 transport = new transport_peer( localDescriptor.getEndpoints() );
00113 #endif
00114
00115 logging_info( "Searching for local locators ..." );
00116 AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() );
00117 logging_info( "Done. Local endpoints = " << localDescriptor.toString() );
00118
00119 transport->register_listener( this );
00120 transport->start();
00121
00122 #ifndef UNDERLAY_OMNET
00123
00124 networkMonitor.registerNotification( this );
00125 #endif
00126
00127
00128 started = true;
00129 logging_info( "Started up." );
00130 }
00131
00132 void BaseCommunication::stop() {
00133 logging_info( "Stopping transports ..." );
00134
00135 transport->stop();
00136 delete transport;
00137 started = false;
00138
00139 logging_info( "Stopped." );
00140 }
00141
00142 bool BaseCommunication::isStarted(){
00143 return started;
00144 }
00145
00147 void BaseCommunication::setEndpoints( string& _endpoints ) {
00148 localDescriptor.getEndpoints().assign(_endpoints);
00149 logging_info("Setting local end-points: "
00150 << localDescriptor.getEndpoints().to_string());
00151 }
00152
00153 const LinkID BaseCommunication::establishLink(
00154 const EndpointDescriptor& descriptor,
00155 const LinkID& link_id,
00156 const QoSParameterSet& qos,
00157 const SecurityParameterSet& sec) {
00158
00159
00160 LinkID linkid = link_id;
00161
00162
00163 logging_debug( "Request to establish link" );
00164
00165
00166 if (linkid.isUnspecified()) linkid = LinkID::create();
00167
00168
00169 logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() );
00170 LinkDescriptor* ld = new LinkDescriptor();
00171 ld->localLink = linkid;
00172 addLink( ld );
00173
00174
00175 logging_debug( "Send messages with request to open link to " << descriptor.toString() );
00176 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid );
00177 baseMsg.getLocalDescriptor() = localDescriptor;
00178
00179
00180 send( &baseMsg, descriptor );
00181
00182 return linkid;
00183 }
00184
00185 void BaseCommunication::dropLink(const LinkID link) {
00186
00187 logging_debug( "Starting to drop link " + link.toString() );
00188
00189
00190 LinkDescriptor& ld = queryLocalLink( link );
00191 if( ld.isUnspecified() ) {
00192 logging_error( "Don't know the link you want to drop "+ link.toString() );
00193 return;
00194 }
00195
00196
00197 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
00198 i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
00199 }
00200
00201
00202 logging_debug( "Sending out link close request. for us, the link is closed now" );
00203 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink );
00204
00205
00206 send( &msg, ld );
00207
00208
00209 removeLink(link);
00210 }
00211
00212 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
00213
00214 logging_debug( "Sending out message to link " << lid.toString() );
00215
00216
00217 LinkDescriptor& ld = queryLocalLink(lid);
00218 if( ld.isUnspecified() ){
00219 logging_error( "Don't know the link with id " << lid.toString() );
00220 return -1;
00221 }
00222
00223
00224 if( !ld.up ) {
00225 logging_error("Can not send on link " << lid.toString() << ": link not up");
00226 return -1;
00227 }
00228
00229
00230 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink );
00231
00232
00233 msg.encapsulate( const_cast<Message*>(message) );
00234
00235
00236 send( &msg, ld );
00237
00238
00239 return ++currentSeqnum;
00240 }
00241
00242 const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
00243 if( link == LinkID::UNSPECIFIED){
00244 return localDescriptor;
00245 } else {
00246 LinkDescriptor& linkDesc = queryLocalLink(link);
00247 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
00248 return linkDesc.remoteEndpoint;
00249 }
00250 }
00251
00252 void BaseCommunication::registerEventListener(CommunicationEvents* _events){
00253 if( eventListener.find( _events ) == eventListener.end() )
00254 eventListener.insert( _events );
00255 }
00256
00257 void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
00258 EventListenerSet::iterator i = eventListener.find( _events );
00259 if( i != eventListener.end() )
00260 eventListener.erase( i );
00261 }
00262
00263 SystemEventType TransportEvent("Transport");
00264 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
00265
00266 class DispatchMsg {
00267 public:
00268 address_v* local;
00269 address_v* remote;
00270 Message* message;
00271 };
00272
00274 void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
00275
00276
00277 if ( event.getType() == MessageDispatchEvent ){
00278 logging_debug( "Forwarding message receiver" );
00279 DispatchMsg* dmsg = event.getData<DispatchMsg>();
00280 Message* msg = dmsg->message;
00281 receiveMessage(msg, dmsg->local, dmsg->remote);
00282 msg->dropPayload();
00283 delete dmsg;
00284 delete msg;
00285 }
00286 }
00287
00289 void BaseCommunication::receive_message(transport_protocol* transport,
00290 const address_vf local, const address_vf remote, const uint8_t* data,
00291 size_t size) {
00292
00293
00294
00295
00296 Data data_( const_cast<uint8_t*>(data), size * 8 );
00297 DispatchMsg* dmsg = new DispatchMsg();
00298
00299 Message* msg = new Message(data_);
00300 dmsg->local = local->clone();
00301 dmsg->remote = remote->clone();
00302 dmsg->message = msg;
00303
00304 SystemQueue::instance().scheduleEvent(
00305 SystemEvent( this, MessageDispatchEvent, dmsg )
00306 );
00307 }
00308
00310 void BaseCommunication::receiveMessage(const Message* message,
00311 const address_v* local, const address_v* remote ){
00312
00314 AribaBaseMsg* msg = ((Message*)message)->decapsulate<AribaBaseMsg>();
00315 logging_debug( "Receiving message of type " << msg->getTypeString() );
00316
00317
00318 switch (msg->getType()) {
00319
00320
00321
00322
00323 case AribaBaseMsg::typeData: {
00324 logging_debug( "Received data message, forwarding to overlay" );
00325 if( messageReceiver != NULL ) {
00326 messageReceiver->receiveMessage(
00327 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED
00328 );
00329 }
00330 break;
00331 }
00332
00333
00334
00335
00336 case AribaBaseMsg::typeLinkRequest: {
00337 logging_debug( "Received link open request" );
00338
00340 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) {
00341 logging_debug("Link request already received. Ignore!");
00342 break;
00343 }
00344
00346 LinkID localLink = LinkID::create();
00347 LinkID remoteLink = msg->getLocalLink();
00348 logging_debug( "local=" << local->to_string()
00349 << " remote=" << remote->to_string()
00350 );
00351
00352
00353 bool allowlink = true;
00354 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00355 allowlink &= i->onLinkRequest( localLink, local, remote );
00356 }
00357
00358
00359 if( !allowlink ){
00360 logging_warn( "Overlay denied creation of link" );
00361 return;
00362 }
00363
00364
00365 LinkDescriptor* ld = new LinkDescriptor();
00366 ld->localLink = localLink;
00367 ld->remoteLink = remoteLink;
00368 ld->localLocator = local->clone();
00369 ld->remoteLocator = remote->clone();
00370 ld->remoteEndpoint = msg->getLocalDescriptor();
00371 add_endpoint(ld->remoteLocator);
00372
00373
00374 ld->remoteEndpoint.getEndpoints().add(
00375 ld->remoteLocator, endpoint_set::Layer1_3);
00376 localDescriptor.getEndpoints().add(
00377 local, endpoint_set::Layer1_3
00378 );
00379
00380
00381 ld->up = true;
00382 addLink(ld);
00383
00384
00385 logging_debug( "Link (initiated from remote) is up with "
00386 << "local(id=" << ld->localLink.toString() << ","
00387 << "locator=" << ld->localLocator->to_string() << ") "
00388 << "remote(id=" << ld->remoteLink.toString() << ", "
00389 << "locator=" << ld->remoteLocator->to_string() << ")"
00390 );
00391
00392
00393 logging_debug( "Sending link request reply with ids "
00394 << "local=" << localLink.toString() << ", "
00395 << "remote=" << remoteLink.toString() );
00396 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink );
00397 reply.getLocalDescriptor() = localDescriptor;
00398 reply.getRemoteDescriptor() = ld->remoteEndpoint;
00399
00400 send( &reply, *ld );
00401
00402
00403 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
00404 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
00405 }
00406
00407
00408 break;
00409 }
00410
00411
00412
00413
00414 case AribaBaseMsg::typeLinkReply: {
00415 logging_debug( "Received link open reply for a link we initiated" );
00416
00417
00418
00419 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink() );
00420
00421
00422 if (ld.isUnspecified()) {
00423 logging_warn("Failed to find local link " << msg->getRemoteLink().toString());
00424 return;
00425 }
00426
00427
00428 ld.remoteLink = msg->getLocalLink();
00429 ld.remoteLocator = remote->clone();
00430 localDescriptor.getEndpoints().add(
00431 msg->getRemoteDescriptor().getEndpoints(),
00432 endpoint_set::Layer1_3
00433 );
00434 ld.up = true;
00435 add_endpoint(ld.remoteLocator);
00436
00437 logging_debug( "Link is now up with local id "
00438 << ld.localLink.toString() << " and remote id "
00439 << ld.remoteLink.toString() );
00440
00441
00442
00443 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00444 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
00445 }
00446
00447
00448 break;
00449 }
00450
00451
00452
00453
00454 case AribaBaseMsg::typeLinkClose: {
00455
00456 const LinkID& localLink = msg->getRemoteLink();
00457 logging_debug( "Received link close request for link " << localLink.toString() );
00458
00459
00460 LinkDescriptor& linkDesc = queryLocalLink( localLink );
00461 if (linkDesc.isUnspecified()) {
00462 logging_warn("Failed to find local link " << localLink.toString());
00463 return;
00464 }
00465
00466
00467 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00468 i->onLinkDown( linkDesc.localLink,
00469 linkDesc.localLocator, linkDesc.remoteLocator );
00470 }
00471
00472
00473 removeLink( localLink );
00474
00475
00476 break;
00477 }
00478
00479
00480
00481
00482 case AribaBaseMsg::typeLinkUpdate: {
00483 const LinkID& localLink = msg->getRemoteLink();
00484 logging_debug( "Received link update for link "
00485 << localLink.toString() );
00486
00487
00488 LinkDescriptor& linkDesc = queryLocalLink( localLink );
00489 if (linkDesc.isUnspecified()) {
00490 logging_warn("Failed to update local link "
00491 << localLink.toString());
00492 return;
00493 }
00494
00495
00496 const address_v* oldremote = linkDesc.remoteLocator;
00497 linkDesc.remoteLocator = remote->clone();
00498
00499
00500 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
00501 i->onLinkChanged(
00502 linkDesc.localLink,
00503 linkDesc.localLocator,
00504 linkDesc.localLocator,
00505 oldremote,
00506 linkDesc.remoteLocator
00507 );
00508 }
00509
00510
00511 break;
00512 }
00513 }
00514 delete msg;
00515 }
00516
00518 void BaseCommunication::addLink( LinkDescriptor* link ) {
00519 linkSet.push_back( link );
00520 }
00521
00523 void BaseCommunication::removeLink( const LinkID& localLink ) {
00524 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
00525 if( (*i)->localLink != localLink) continue;
00526 remove_endpoint((*i)->remoteLocator);
00527 delete *i;
00528 linkSet.erase( i );
00529 break;
00530 }
00531 }
00532
00534 BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
00535 for (int i=0; i<linkSet.size();i++)
00536 if (linkSet[i]->localLink == link) return (LinkDescriptor&)*linkSet[i];
00537
00538 return LinkDescriptor::UNSPECIFIED();
00539 }
00540
00542 BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
00543 for (int i=0; i<linkSet.size();i++)
00544 if (linkSet[i]->remoteLink == link) return (LinkDescriptor&)*linkSet[i];
00545
00546 return LinkDescriptor::UNSPECIFIED();
00547 }
00548
00549 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
00550 LinkIDs ids;
00551 for (int i=0; i<linkSet.size(); i++){
00552 if( addr == NULL ){
00553 ids.push_back( linkSet[i]->localLink );
00554 } else {
00555 if ( *linkSet[i]->remoteLocator == *addr )
00556 ids.push_back( linkSet[i]->localLink );
00557 }
00558 }
00559 return ids;
00560 }
00561
00562 void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
00563
00564 #ifdef UNDERLAY_OMNET
00565
00566
00567 return
00568
00569 #endif // UNDERLAY_OMNET
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697
00698
00699
00700
00701
00702
00703
00704 }
00705
00707 void BaseCommunication::send(Message* message, const EndpointDescriptor& endpoint) {
00708 Data data = data_serialize( message, DEFAULT_V );
00709 transport->send( endpoint.getEndpoints(), data.getBuffer(), data.getLength() / 8);
00710 }
00711
00713 void BaseCommunication::send(Message* message, const LinkDescriptor& desc) {
00714 if (desc.remoteLocator==NULL) return;
00715 Data data = data_serialize( message, DEFAULT_V );
00716 transport->send( desc.remoteLocator, data.getBuffer(), data.getLength() / 8);
00717 }
00718
00719 }}