00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "BaseOverlay.h"
00040
00041 #include "ariba/utility/misc/OvlVis.h"
00042 #include "ariba/NodeListener.h"
00043 #include "ariba/CommunicationListener.h"
00044 #include "ariba/SideportListener.h"
00045
00046 #include "ariba/overlay/messages/OverlayMsg.h"
00047 #include "ariba/overlay/messages/JoinRequest.h"
00048 #include "ariba/overlay/messages/JoinReply.h"
00049 #include "ariba/overlay/messages/LinkRequest.h"
00050
00051 namespace ariba {
00052 namespace overlay {
00053
00054 use_logging_cpp(BaseOverlay);
00055
00056 BaseOverlay::BaseOverlay()
00057 : bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
00058 spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED),
00059 state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT){
00060 }
00061
00062 BaseOverlay::~BaseOverlay(){
00063 }
00064
00065 void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ){
00066
00067 bc = &_basecomm;
00068 nodeId = _nodeid;
00069
00070 logging_info("creating base overlay");
00071
00072 bc->registerMessageReceiver( this );
00073 bc->registerEventListener( this );
00074
00075 ovl.visCreate( ovlId, nodeId, string(""), string("") );
00076 ovl.visChangeNodeColor(ovlId, nodeId, OvlVis::NODE_COLORS_GREY);
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090 Timer::setInterval( 5000 );
00091 Timer::start();
00092 }
00093
00094 void BaseOverlay::stop() {
00095
00096 logging_info("deleting base overlay");
00097
00098 Timer::stop();
00099 bc->unregisterMessageReceiver( this );
00100 bc->unregisterEventListener( this );
00101 }
00102
00103 void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp){
00104
00105 ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
00106 logging_info( "starting to join spovnet " << id.toString() <<
00107 " with nodeid " << nodeId.toString());
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118 spovnetId = id;
00119 state = BaseOverlayStateJoinInitiated;
00120
00121 initiatorLink = bc->establishLink( bootstrapEp );
00122 logging_info("join process initiated for " << id.toString() << "...");
00123 }
00124
00125 void BaseOverlay::leaveSpoVNet(){
00126
00127 logging_info( "leaving spovnet " << spovnetId );
00128 bool ret = ( state != this->BaseOverlayStateInvalid );
00129
00130 logging_debug( "dropping all auto-links ..." );
00131
00132
00133
00134
00135
00136
00137
00138
00139 vector<LinkID> servicelinks;
00140 BOOST_FOREACH( LinkPair item, linkMapping ){
00141 if( item.second.service != OverlayInterface::OVERLAY_SERVICE_ID )
00142 servicelinks.push_back( item.first );
00143 }
00144 BOOST_FOREACH( LinkID lnk, servicelinks ){
00145
00146
00147 dropLink( lnk );
00148 }
00149
00150
00151
00152 logging_debug( "leaving overlay" );
00153
00154 if( overlayInterface != NULL )
00155 overlayInterface->leaveOverlay();
00156
00157
00158
00159 if( state != BaseOverlayStateInitiator ){
00160
00161
00162 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeBye, nodeId );
00163 bc->sendMessage( initiatorLink, &overMsg );
00164
00165
00166 bc->dropLink( initiatorLink );
00167 initiatorLink = LinkID::UNSPECIFIED;
00168 }
00169
00170 state = BaseOverlayStateInvalid;
00171 ovl.visShutdown( ovlId, nodeId, string("") );
00172
00173
00174 BOOST_FOREACH( NodeListener* i, nodeListeners ){
00175 if( ret ) i->onLeaveCompleted( spovnetId );
00176 else i->onLeaveFailed( spovnetId );
00177 }
00178 }
00179
00180 void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos){
00181
00182
00183
00184 logging_info( "creating spovnet " + id.toString() <<
00185 " with nodeid " << nodeId.toString() );
00186
00187 spovnetId = id;
00188 state = BaseOverlayStateInitiator;
00189
00190 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
00191 if( overlayInterface == NULL ){
00192 logging_fatal( "overlay structure not supported" );
00193 state = BaseOverlayStateInvalid;
00194 return;
00195 }
00196
00197
00198 overlayInterface->joinOverlay();
00199 BOOST_FOREACH( NodeListener* i, nodeListeners )
00200 i->onJoinCompleted( spovnetId );
00201
00202 ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
00203 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
00204 }
00205
00206
00208 const LinkID BaseOverlay::establishLink( const NodeID& node,
00209 const ServiceID& service, const LinkID& link_id ) {
00210
00211 if( !communicationListeners.contains( service ) ){
00212 logging_error( "no registered listener on serviceid " << service.toString() );
00213 return LinkID::UNSPECIFIED;
00214 }
00215
00216
00217 LinkID linkid = link_id;
00218
00219
00220 if (linkid.isUnspecified()) linkid = LinkID::create();
00221
00222
00223 logging_debug( "BaseOverlay called to establish link between node " <<
00224 node.toString() << " for service " << service.toString() );
00225
00226
00227 OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId );
00228 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
00229 LinkRequest link_request_msg( nonce, &bc->getEndpointDescriptor() );
00230 overlay_msg.encapsulate( &link_request_msg );
00231 pendingLinks.insert( make_pair(nonce, linkid) );
00232
00233
00234 logging_debug( "BaseOverlay routes LinkRequest message to node " << node.toString() );
00235
00236
00237 overlayInterface->routeMessage( node, &overlay_msg );
00238
00239 CommunicationListener* receiver = communicationListeners.get( service );
00240 assert( receiver != NULL );
00241
00242 LinkItem item (linkid, NodeID::UNSPECIFIED, service, receiver);
00243 linkMapping.insert( make_pair(linkid, item) );
00244
00245 return linkid;
00246 }
00247
00248 const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep,
00249 const ServiceID& service, const LinkID& linkid ){
00250
00251 if( !communicationListeners.contains( service ) ){
00252 logging_error( "no registered listener on serviceid " << service.toString() );
00253 return LinkID::UNSPECIFIED;
00254 }
00255
00256 const LinkID link = bc->establishLink( ep, linkid );
00257
00258 CommunicationListener* receiver = communicationListeners.get( service );
00259 assert( receiver != NULL );
00260
00261 LinkItem item (link, NodeID::UNSPECIFIED, service, receiver);
00262 linkMapping.insert( make_pair(link, item) );
00263
00264 return link;
00265 }
00266
00267 void BaseOverlay::dropLink(const LinkID& link){
00268
00269 logging_debug( "baseoverlay dropping link " << link.toString() );
00270 LinkMapping::iterator i = linkMapping.find( link );
00271
00272
00273 if( i == linkMapping.end() ){
00274 logging_warn( "can't drop link, mapping unknown " << link.toString() );
00275 return;
00276 }
00277
00278 LinkItem item = i->second;
00279
00280
00281 if( item.waitingmsg.size() > 0 ){
00282
00283 logging_warn( "dropping link " << link.toString() <<
00284 " that has " << item.waitingmsg.size() << " waiting messages" );
00285
00286 item.deleteWaiting();
00287 }
00288
00289
00290 linkMapping.erase( i );
00291 bc->dropLink( link );
00292
00293
00294 item.interface->onLinkDown( link, item.node );
00295 sideport->onLinkDown(link, this->nodeId, item.node, this->spovnetId );
00296 }
00297
00298 seqnum_t BaseOverlay::sendMessage(const Message* message, const LinkID& link ){
00299
00300 logging_debug( "baseoverlay is sending data message on link " << link.toString() );
00301
00302
00303
00304
00305
00306 LinkMapping::iterator i = linkMapping.find( link );
00307 if( i == linkMapping.end() ){
00308 logging_error( "could not send message. link not found " << link.toString() );
00309 return -1;
00310 }
00311
00312 i->second.markused();
00313
00314
00315
00316
00317
00318 if( !i->second.linkup ){
00319
00320 if( i->second.autolink ){
00321 logging_info( "auto link " << link.toString() << " is not up yet, queueing message" );
00322 Data data = data_serialize( message );
00323 const_cast<Message*>(message)->dropPayload();
00324 i->second.waitingmsg.push_back( new Message(data) );
00325 } else {
00326 logging_error("link " << link.toString() << " is not up yet, dropping message" );
00327 }
00328
00329 return -1;
00330 }
00331
00332
00333
00334
00335
00336 OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId );
00337 overmsg.encapsulate( const_cast<Message*>(message) );
00338
00339 return bc->sendMessage( link, &overmsg );
00340 }
00341
00342 seqnum_t BaseOverlay::sendMessage(const Message* message, const NodeID& node, const ServiceID& service){
00343
00344 LinkID link = LinkID::UNSPECIFIED;
00345
00346 LinkMapping::iterator i = linkMapping.begin();
00347 LinkMapping::iterator iend = linkMapping.end();
00348
00349
00350
00351
00352
00353 for( ; i != iend; i++ ){
00354 if( i->second.node == node && i->second.service == service ){
00355 link = i->second.link;
00356 break;
00357 }
00358 }
00359
00360
00361
00362
00363
00364 if( link == LinkID::UNSPECIFIED ){
00365
00366 logging_info( "no link could be found to send message to node " <<
00367 node.toString() << " for service " << service.toString() <<
00368 ". creating auto link ...");
00369
00370
00371 link = establishLink( node, service );
00372
00373
00374 LinkMapping::iterator i = linkMapping.find( link );
00375 i->second.autolink = true;
00376
00377 if( i == linkMapping.end() || link == LinkID::UNSPECIFIED ){
00378 logging_error( "failed to establish auto link to node " << node.toString() <<
00379 " for service " << service.toString() );
00380 return -1;
00381 }
00382
00383 logging_debug( "establishing autolink in progress to node "
00384 << node.toString() << " with new link-id " << link.toString() );
00385
00386 }
00387
00388 assert( link != LinkID::UNSPECIFIED );
00389
00390
00391
00392 i->second.markused();
00393
00394
00395
00396
00397
00398 return sendMessage( message, link );
00399 }
00400
00401 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const LinkID& link) const {
00402
00403 return bc->getEndpointDescriptor( link );
00404 }
00405
00406 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const NodeID& node) const {
00407
00408 if( node == nodeId || node == NodeID::UNSPECIFIED )
00409 return bc->getEndpointDescriptor();
00410
00411 if( overlayInterface == NULL ){
00412 logging_error( "overlay interface not set, cannot resolve endpoint" );
00413 return EndpointDescriptor::UNSPECIFIED;
00414 }
00415
00416
00417 return overlayInterface->resolveNode( node );
00418 }
00419
00420
00421 bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid){
00422 logging_debug( "binding communication listener " << listener
00423 << " on serviceid " << sid.toString() );
00424
00425 if( communicationListeners.contains( sid ) ){
00426 logging_error( "some listener already registered for service id "
00427 << sid.toString() );
00428 return false;
00429 }
00430
00431 communicationListeners.registerItem( listener, sid );
00432 return true;
00433 }
00434
00435 bool BaseOverlay::registerSidePort(SideportListener* _sideport){
00436 sideport = _sideport;
00437 _sideport->configure( this );
00438 }
00439
00440 bool BaseOverlay::unregisterSidePort(SideportListener* _sideport){
00441 sideport = &SideportListener::DEFAULT;
00442 }
00443
00444 bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid){
00445 logging_debug( "unbinding listener " << listener
00446 << " from serviceid " << sid.toString() );
00447
00448 if( !communicationListeners.contains( sid ) ){
00449 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
00450 return false;
00451 }
00452
00453 if( communicationListeners.get(sid) != listener ){
00454 logging_warn( "listener bound to service id " << sid.toString()
00455 << " is different than listener trying to unbind" );
00456 return false;
00457 }
00458
00459 communicationListeners.unregisterItem( sid );
00460 return true;
00461 }
00462
00463 bool BaseOverlay::bind(NodeListener* listener){
00464 logging_debug( "binding node listener " << listener );
00465
00466 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
00467 if( i != nodeListeners.end() ){
00468 logging_warn( "node listener " << listener << " is already bound, cannot bind" );
00469 return false;
00470 }
00471
00472 nodeListeners.push_back( listener );
00473 return true;
00474 }
00475
00476 bool BaseOverlay::unbind(NodeListener* listener){
00477 logging_debug( "unbinding node listener " << listener );
00478
00479 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
00480 if( i == nodeListeners.end() ){
00481 logging_warn( "node listener " << listener << " is not bound, cannot unbind" );
00482 return false;
00483 }
00484
00485 nodeListeners.erase( i );
00486 return true;
00487 }
00488
00489 void BaseOverlay::onLinkUp(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
00490
00491 logging_debug( "base overlay received linkup event " + id.toString() );
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502 if( state == BaseOverlayStateJoinInitiated && id == initiatorLink){
00503
00504 logging_info(
00505 "Join has been initiated by me and the link is now up. " <<
00506 "sending out join request for SpoVNet " << spovnetId.toString()
00507 );
00508
00509 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeJoinRequest, nodeId );
00510 JoinRequest joinmsg( spovnetId, nodeId );
00511 overMsg.encapsulate( &joinmsg );
00512
00513 state = BaseOverlayStateJoinInitiated;
00514 bc->sendMessage( id, &overMsg );
00515
00516 return;
00517
00518 }
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529 LinkMapping::iterator i = linkMapping.find( id );
00530
00531 if( i == linkMapping.end() ){
00532
00533 LinkItem item (id, NodeID::UNSPECIFIED, ServiceID::UNSPECIFIED, &CommunicationListener::DEFAULT );
00534 linkMapping.insert( make_pair(id, item) );
00535
00536 } else {
00537
00538 logging_debug( "sending out OverlayMessageTypeUpdate" <<
00539 " for service " << i->second.service.toString() <<
00540 " with local node id " << nodeId.toString() <<
00541 " on link " << id.toString() );
00542
00543 OverlayMsg overMsg(
00544 OverlayMsg::OverlayMessageTypeUpdate,
00545 i->second.service,
00546 nodeId
00547 );
00548
00549 bc->sendMessage( id, &overMsg );
00550 i->second.markused();
00551
00552 }
00553
00554
00555
00556
00557 }
00558
00559 void BaseOverlay::onLinkDown(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
00560
00561 logging_debug( "link went down " << id.toString() );
00562
00563
00564
00565
00566
00567
00568 LinkMapping::iterator i = linkMapping.find( id );
00569 if( i == linkMapping.end() ) {
00570
00571
00572
00573
00574
00575 return;
00576 }
00577
00578 i->second.interface->onLinkDown( id, i->second.node );
00579 sideport->onLinkDown( id, this->nodeId, i->second.node, this->spovnetId );
00580
00581
00582 if( i->second.waitingmsg.size() > 0 ){
00583
00584 logging_warn( "dropping link " << id.toString() <<
00585 " that has " << i->second.waitingmsg.size() << " waiting messages" );
00586
00587 i->second.deleteWaiting();
00588 }
00589
00590 linkMapping.erase( i );
00591 }
00592
00593 void BaseOverlay::onLinkChanged(const LinkID& id, const NetworkLocator* oldlocal, const NetworkLocator* newlocal, const NetworkLocator* oldremote, const NetworkLocator* newremote){
00594
00595 logging_debug( "link changed " << id.toString() );
00596
00597
00598
00599
00600
00601 LinkMapping::iterator i = linkMapping.find( id );
00602 if( i == linkMapping.end() ) return;
00603
00604 i->second.interface->onLinkChanged( id, i->second.node );
00605 sideport->onLinkChanged( id, this->nodeId, i->second.node, this->spovnetId );
00606
00607
00608
00609 i->second.markused();
00610 }
00611
00612 void BaseOverlay::onLinkFail(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
00613
00614 logging_debug( "link failed " << id.toString() );
00615
00616
00617
00618
00619
00620 LinkMapping::iterator i = linkMapping.find( id );
00621 if( i == linkMapping.end() ) return;
00622
00623 i->second.interface->onLinkFail( id, i->second.node );
00624 sideport->onLinkFail( id, this->nodeId, i->second.node, this->spovnetId );
00625
00626 i->second.markused();
00627 }
00628
00629 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote, const QoSParameterSet& qos) {
00630
00631 logging_debug( "link qos changed " << id.toString() );
00632
00633
00634
00635
00636
00637 LinkMapping::iterator i = linkMapping.find( id );
00638 if( i == linkMapping.end() ) return;
00639
00640
00641
00642
00643 i->second.markused();
00644 }
00645
00646 bool BaseOverlay::onLinkRequest( const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote ){
00647
00648
00649
00650
00651 logging_debug("received link request from " << remote->toString() << ", accepting");
00652 return true;
00653 }
00654
00655
00656 bool BaseOverlay::receiveMessage(const Message* message,
00657 const LinkID& link, const NodeID&
00658 ){
00659
00660
00661 logging_debug( "receiveMessage: " << message->toString());
00662 OverlayMsg* overlayMsg = const_cast<Message*>(message)->decapsulate<OverlayMsg>();
00663 if( overlayMsg == NULL ) return false;
00664
00665
00666 LinkMapping::iterator item = linkMapping.find( link );
00667 if( item != linkMapping.end() ) item->second.markused();
00668
00669
00670
00671
00672
00673
00674 if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) {
00675
00676 logging_debug( "baseoverlay received message of type OverlayMessageTypeData" );
00677
00678 const ServiceID& service = overlayMsg->getService();
00679 CommunicationListener* serviceListener = communicationListeners.get( service );
00680
00681 logging_debug( "received data for service " << service.toString() );
00682
00683 if( serviceListener != NULL )
00684 serviceListener->onMessage( overlayMsg, overlayMsg->getSourceNode(), link );
00685
00686 return true;
00687
00688 }
00689
00690
00691
00692
00693 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) &&
00694 state == BaseOverlayStateInitiator){
00695
00696 logging_debug(
00697 "baseoverlay received message of type OverlayMessageTypeJoinRequest"
00698 );
00699
00700 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
00701 logging_info( "received join request for spovnet " <<
00702 joinReq->getSpoVNetID().toString() );
00703
00704
00705
00706 if( joinReq->getSpoVNetID() != spovnetId ){
00707 logging_error( "received join request for spovnet we don't handle " <<
00708 joinReq->getSpoVNetID().toString() );
00709 return false;
00710 }
00711
00712
00713
00714
00715
00716
00717
00718
00719 bool allow = true;
00720
00721 logging_info( "sending back join reply for spovnet " <<
00722 spovnetId.toString() << " to node " <<
00723 overlayMsg->getSourceNode().toString() <<
00724 ". result: " << (allow ? "allowed" : "denied") );
00725
00726 joiningNodes.push_back( overlayMsg->getSourceNode() );
00727
00728
00729
00730
00731
00732
00733 assert( overlayInterface != NULL );
00734 OverlayParameterSet parameters = overlayInterface->getParameters();
00735
00736 OverlayMsg retmsg( OverlayMsg::OverlayMessageTypeJoinReply, nodeId );
00737 JoinReply replyMsg( spovnetId, parameters,
00738 allow, getEndpointDescriptor() );
00739
00740 retmsg.encapsulate(&replyMsg);
00741 bc->sendMessage( link, &retmsg );
00742
00743 return true;
00744
00745 }
00746
00747
00748
00749
00750
00751 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) &&
00752 state == BaseOverlayStateJoinInitiated){
00753
00754 logging_debug(
00755 "baseoverlay received message of type OverlayMessageTypeJoinReply");
00756
00757 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
00758 logging_info( "received spovnet join reply" );
00759
00760
00761
00762 if( replyMsg->getSpoVNetID() != spovnetId ){
00763 logging_error( "received spovnet join reply for spovnet " <<
00764 replyMsg->getSpoVNetID().toString() <<
00765 " but we wanted to join spovnet " <<
00766 spovnetId.toString() );
00767
00768
00769 return false;
00770 }
00771
00772
00773
00774 if( ! replyMsg->getJoinAllowed() ){
00775
00776 logging_error( "our join request has been denied" );
00777
00778 bc->dropLink( initiatorLink );
00779 initiatorLink = LinkID::UNSPECIFIED;
00780 state = BaseOverlayStateInvalid;
00781
00782
00783 BOOST_FOREACH( NodeListener* i, nodeListeners ){
00784 i->onJoinFailed( spovnetId );
00785 }
00786
00787 return true;
00788 }
00789
00790 logging_info( "join request has been accepted for spovnet " <<
00791 spovnetId.toString() );
00792
00793
00794
00795 overlayInterface = OverlayFactory::create( *this,
00796 replyMsg->getParam(), nodeId, this );
00797
00798 if( overlayInterface == NULL ){
00799 logging_error( "overlay structure not supported" );
00800
00801 bc->dropLink( initiatorLink );
00802 initiatorLink = LinkID::UNSPECIFIED;
00803 state = BaseOverlayStateInvalid;
00804
00805
00806 BOOST_FOREACH( NodeListener* i, nodeListeners )
00807 i->onJoinFailed( spovnetId );
00808
00809 return true;
00810 }
00811
00812
00813
00814
00815
00816 state = BaseOverlayStateCompleted;
00817 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
00818
00819 overlayInterface->createOverlay();
00820 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
00821
00822
00823 BOOST_FOREACH( NodeListener* i, nodeListeners ){
00824 i->onJoinCompleted( spovnetId );
00825 }
00826
00827 return true;
00828
00829 }
00830
00831
00832
00833
00834
00835 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ){
00836
00837 logging_debug(
00838 "baseoverlay received message of type OverlayMessageTypeUpdate"
00839 );
00840
00841 const NodeID& sourcenode = overlayMsg->getSourceNode();
00842 const ServiceID& service = overlayMsg->getService();
00843
00844
00845 LinkMapping::iterator i = linkMapping.find( link );
00846 if( i == linkMapping.end() ) {
00847 logging_warn( "received overlay update message for link " <<
00848 link.toString() << " for which we have no mapping" );
00849 return false;
00850 }
00851
00852
00853 bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service );
00854 i->second.node = sourcenode;
00855 i->second.service = service;
00856
00857
00858 if( changed ){
00859 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId );
00860 bc->sendMessage( link, &overMsg );
00861 }
00862
00863
00864
00865 if( !communicationListeners.contains( service ) ){
00866 logging_warn( "linkup event for service that has not been registered" );
00867 return false;
00868 }
00869
00870 CommunicationListener* iface = communicationListeners.get( service );
00871 if( iface == NULL || iface == &CommunicationListener::DEFAULT ){
00872 logging_warn( "linkup event for service that has been registered "
00873 "with a NULL interface" );
00874 return true;
00875 }
00876
00877 i->second.interface = iface;
00878 i->second.markused();
00879
00880
00881 if( !iface->onLinkRequest(sourcenode) ){
00882
00883 logging_debug("link " << link.toString() <<
00884 " has been denied by service " << service.toString() << ", dropping link");
00885
00886
00887 i->second.interface = &CommunicationListener::DEFAULT;
00888
00889 dropLink( link );
00890
00891 return true;
00892 }
00893
00894
00895
00896
00897
00898 i->second.linkup = true;
00899 logging_debug("link " << link.toString() <<
00900 " has been accepted by service " << service.toString() << " and is now up");
00901
00902 if( i->second.waitingmsg.size() > 0 ){
00903 logging_info( "sending out queued messages on link " << link.toString() );
00904
00905 BOOST_FOREACH( Message* msg, i->second.waitingmsg ){
00906 sendMessage( msg, link );
00907 delete msg;
00908 }
00909
00910 i->second.waitingmsg.clear();
00911 }
00912
00913
00914 iface->onLinkUp( link, sourcenode );
00915 sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId );
00916
00917 return true;
00918
00919 }
00920
00921
00922
00923
00924 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye) ) {
00925
00926 logging_debug( "BaseOverlay received message of type OverlayMessageTypeBye" );
00927 logging_debug( "Received bye message from " <<
00928 overlayMsg->getSourceNode().toString() );
00929
00930
00931
00932
00933
00934 if( overlayMsg->getSourceNode() == spovnetInitiator ){
00935
00936 bc->dropLink( initiatorLink );
00937 initiatorLink = LinkID::UNSPECIFIED;
00938 state = BaseOverlayStateInvalid;
00939
00940 logging_fatal( "initiator ended spovnet" );
00941
00942
00943 BOOST_FOREACH( NodeListener* i, nodeListeners ){
00944 i->onLeaveFailed( spovnetId );
00945 }
00946
00947 } else {
00948
00949
00950
00951 logging_info( "node left " << overlayMsg->getSourceNode() );
00952 }
00953
00954 return true;
00955
00956 }
00957
00958
00959
00960
00961 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest)) {
00962 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
00963 const ServiceID& service = overlayMsg->getService();
00964 if (linkReq->isReply()) {
00965
00966
00967 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
00968 if ( i == pendingLinks.end() ) {
00969 logging_error( "Nonce not found in link request" );
00970 return true;
00971 }
00972
00973
00974 logging_debug( "LinkRequest reply received. Establishing link "
00975 << i->second << " to " << (linkReq->getEndpoint()->toString())
00976 << " for service " << service.toString()
00977 << " with nonce " << linkReq->getNonce()
00978 );
00979
00980
00981 bc->establishLink( *linkReq->getEndpoint(), i->second );
00982 } else {
00983 OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId );
00984 LinkRequest link_request_msg(
00985 linkReq->getNonce(), &bc->getEndpointDescriptor(), true );
00986 overlay_msg.encapsulate( &link_request_msg );
00987
00988
00989 logging_debug( "Sending LinkRequest reply for link with nonce " <<
00990 linkReq->getNonce() );
00991
00992
00993 overlayInterface->routeMessage(
00994 overlayMsg->getSourceNode(), &overlay_msg
00995 );
00996 }
00997 }
00998
00999
01000
01001
01002 else {
01003
01004 logging_error( "received message in invalid state! don't know " <<
01005 "what to do with this message of type " <<
01006 overlayMsg->getType() );
01007 return false;
01008
01009 }
01010
01011 return false;
01012 }
01013
01014 void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service){
01015
01016 logging_debug( "broadcasting message to all known nodes " <<
01017 "in the overlay from service " + service.toString() );
01018
01019 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes();
01020
01021 OverlayInterface::NodeList::iterator i = nodes.begin();
01022 OverlayInterface::NodeList::iterator iend = nodes.end();
01023
01024 for( ; i != iend; i++ ){
01025 if( *i == nodeId) continue;
01026 sendMessage( message, *i, service );
01027 }
01028 }
01029
01030 vector<NodeID> BaseOverlay::getOverlayNeighbors() const {
01031
01032
01033
01034 vector<NodeID> nodes = overlayInterface->getKnownNodes();
01035 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
01036 if( i != nodes.end() ) nodes.erase( i );
01037
01038 return nodes;
01039 }
01040
01041 void BaseOverlay::updateOvlVis( const NodeID& n ) {
01042 NodeID node = n;
01043
01044
01045
01046
01047
01048
01049 using namespace std;
01050
01051 if (node == nodeId || node.isUnspecified()) return;
01052
01053
01054 if ( node < min || min.isUnspecified() ) min = node;
01055 if ( node > max || max.isUnspecified() ) max = node;
01056
01057
01058 if ( succ.isUnspecified() || (node > nodeId && (succ < nodeId || (node-nodeId) < (succ-nodeId))) ) {
01059 if (!succ.isUnspecified() && node != succ)
01060 ovl.visDisconnect(ovlId, nodeId, succ, string(""));
01061 succ = node;
01062 ovl.visConnect(ovlId, nodeId, succ, string(""));
01063 }
01064
01065
01066 if (succ.isUnspecified() && !min.isUnspecified()) {
01067 succ = min;
01068 ovl.visConnect(ovlId, nodeId, succ, string(""));
01069 }
01070 }
01071
01072 const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
01073
01074 if( lid == LinkID::UNSPECIFIED ) return nodeId;
01075
01076 LinkMapping::const_iterator i = linkMapping.find( lid );
01077 if( i == linkMapping.end() ) return NodeID::UNSPECIFIED;
01078 else return i->second.node;
01079 }
01080
01081 vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
01082
01083 vector<LinkID> linkvector;
01084
01085 BOOST_FOREACH( LinkPair item, linkMapping ){
01086 if( item.second.node == nid || nid == NodeID::UNSPECIFIED ){
01087 linkvector.push_back( item.second.link );
01088 }
01089 }
01090
01091 return linkvector;
01092 }
01093
01094 void BaseOverlay::incomingRouteMessage(Message* msg){
01095
01096 receiveMessage( msg, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED );
01097 }
01098
01099 void BaseOverlay::onNodeJoin(const NodeID& node){
01100
01101 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
01102 if( i == joiningNodes.end() ) return;
01103
01104 logging_info( "node has successfully joined baseoverlay and overlay structure "
01105 << node.toString() );
01106
01107 joiningNodes.erase( i );
01108 }
01109
01110 void BaseOverlay::eventFunction(){
01111
01112 list<LinkID> oldlinks;
01113 time_t now = time(NULL);
01114
01115
01116
01117
01118
01119
01120 BOOST_FOREACH( LinkPair item, linkMapping ){
01121 if( item.second.autolink && difftime(now, item.second.lastuse) > 30)
01122 oldlinks.push_back( item.first );
01123 }
01124
01125 BOOST_FOREACH( const LinkID lnk, oldlinks ) {
01126 logging_debug( "auto-link " << lnk.toString() << " timed out and is getting dropped" );
01127 dropLink( lnk );
01128 }
01129 }
01130
01131 }}