00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "BaseOverlay.h"
00040
00041 #include <sstream>
00042 #include <iostream>
00043 #include <string>
00044 #include <boost/foreach.hpp>
00045
00046 #include "ariba/NodeListener.h"
00047 #include "ariba/CommunicationListener.h"
00048 #include "ariba/SideportListener.h"
00049
00050 #include "ariba/overlay/LinkDescriptor.h"
00051
00052 #include "ariba/overlay/messages/OverlayMsg.h"
00053 #include "ariba/overlay/messages/JoinRequest.h"
00054 #include "ariba/overlay/messages/JoinReply.h"
00055
00056 #include "ariba/utility/misc/OvlVis.h"
00057
00058 namespace ariba {
00059 namespace overlay {
00060
00061
00062
00063
00064
00065 CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
00066 if( !communicationListeners.contains( service ) ) {
00067 logging_error( "No listener found for service " << service.toString() );
00068 return NULL;
00069 }
00070 CommunicationListener* listener = communicationListeners.get( service );
00071 assert( listener != NULL );
00072 return listener;
00073 }
00074
00075
00076
00077 LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
00078 BOOST_FOREACH( LinkDescriptor* lp, links )
00079 if ((communication ? lp->communicationId : lp->overlayId) == link)
00080 return lp;
00081 return NULL;
00082 }
00083
00084 const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
00085 BOOST_FOREACH( const LinkDescriptor* lp, links )
00086 if ((communication ? lp->communicationId : lp->overlayId) == link)
00087 return lp;
00088 return NULL;
00089 }
00090
00092 void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
00093 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
00094 LinkDescriptor* ld = *i;
00095 if ((communication ? ld->communicationId : ld->overlayId) == link) {
00096 delete ld;
00097 links.erase(i);
00098 break;
00099 }
00100 }
00101 }
00102
00104 LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
00105 LinkDescriptor* desc = getDescriptor( link );
00106 if ( desc == NULL ) {
00107 desc = new LinkDescriptor();
00108 if (!link.isUnspecified()) desc->overlayId = link;
00109 links.push_back(desc);
00110 }
00111 return desc;
00112 }
00113
00115 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
00116
00117 BOOST_FOREACH( LinkDescriptor* lp, links )
00118 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
00119 return lp;
00120
00121 BOOST_FOREACH( LinkDescriptor* lp, links )
00122 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
00123 return lp;
00124 return NULL;
00125 }
00126
00128 void BaseOverlay::stabilizeLinks() {
00129
00130 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00131 if (!ld->up) continue;
00132 OverlayMsg msg( OverlayMsg::typeLinkAlive,
00133 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
00134 if (ld->relayed) msg.setRouteRecord(true);
00135 send_link( &msg, ld->overlayId );
00136 }
00137
00138
00139 vector<LinkDescriptor*> oldlinks;
00140 time_t now = time(NULL);
00141 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00142
00143
00144 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
00145
00146
00147 ld->keepAliveMissed++;
00148
00149
00150 if (ld->keepAliveMissed > 4) {
00151 logging_info( "Link connection request is stale, closing: " << ld );
00152 oldlinks.push_back( ld );
00153 continue;
00154 }
00155 }
00156
00157 if (!ld->up) continue;
00158
00159
00160 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
00161 ld->relaying = false;
00162
00163
00164 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
00165 oldlinks.push_back( ld );
00166 continue;
00167 }
00168
00169
00170 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
00171 oldlinks.push_back( ld );
00172 continue;
00173 }
00174
00175
00176 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
00177
00178
00179 ld->keepAliveMissed++;
00180
00181
00182 if (ld->keepAliveMissed >= 4) {
00183 logging_info( "Link is stale, closing: " << ld );
00184 oldlinks.push_back( ld );
00185 continue;
00186 }
00187 }
00188 }
00189
00190
00191 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
00192 logging_info( "Link timed out. Dropping " << ld );
00193 ld->relaying = false;
00194 dropLink( ld->overlayId );
00195 }
00196
00197
00198 counter++;
00199 if (counter>=4) showLinks();
00200 if (counter>=4 || counter<0) counter = 0;
00201 }
00202
00203
00204 std::string BaseOverlay::getLinkHTMLInfo() {
00205 std::ostringstream s;
00206 vector<NodeID> nodes;
00207 if (links.size()==0) {
00208 s << "<h2 style=\"color=#606060\">No links established!</h2>";
00209 } else {
00210 s << "<h2 style=\"color=#606060\">Links</h2>";
00211 s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
00212 s << "<tr style=\"background-color=#ffe0e0\">";
00213 s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
00214 s << "</tr>";
00215
00216 int i=0;
00217 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00218 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
00219 bool found = false;
00220 BOOST_FOREACH(NodeID& id, nodes)
00221 if (id == ld->remoteNode) found = true;
00222 if (found) continue;
00223 i++;
00224 nodes.push_back(ld->remoteNode);
00225 if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
00226 else s << "<tr>";
00227 s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
00228 s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
00229 s << "<td>";
00230 if (ld->routeRecord.size()>1 && ld->relayed) {
00231 for (size_t i=1; i<ld->routeRecord.size(); i++)
00232 s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
00233 } else {
00234 s << "Direct";
00235 }
00236 s << "</td>";
00237 s << "</tr>";
00238 }
00239 s << "</table>";
00240 }
00241 return s.str();
00242 }
00243
00245 void BaseOverlay::showLinks() {
00246 int i=0;
00247 logging_info("--- link state -------------------------------");
00248 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00249 logging_info("link " << i << ": " << ld);
00250 i++;
00251 }
00252 logging_info("----------------------------------------------");
00253 }
00254
00256 int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
00257 LinkDescriptor* lhsld = getDescriptor(lhs);
00258 LinkDescriptor* rhsld = getDescriptor(rhs);
00259 if (lhsld==NULL || rhsld==NULL
00260 || !lhsld->up || !rhsld->up
00261 || lhsld->remoteNode != rhsld->remoteNode) return -1;
00262
00263 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
00264 return -1;
00265
00266 return 1;
00267 }
00268
00269
00270
00271
00273 void BaseOverlay::route( OverlayMsg* message ) {
00274
00275
00276 if (message->getNumHops() > message->getTimeToLive()) {
00277 logging_warn("Message exceeded TTL. Dropping message and relay routes"
00278 "for recovery.");
00279 removeRelayNode(message->getDestinationNode());
00280 return;
00281 }
00282
00283
00284 else {
00285
00286 if (message->getDestinationNode() == nodeId) {
00287 logging_warn("Usually I should not route messages to myself!");
00288 Message msg;
00289 msg.encapsulate(message);
00290 handleMessage( &msg, NULL );
00291 } else {
00292
00293 send( message, message->getDestinationNode() );
00294 }
00295 }
00296 }
00297
00299 seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
00300 LinkDescriptor* next_link = NULL;
00301
00302
00303 if (destination.isUnspecified()) return -1;
00304
00305
00306 if (destination == nodeId) {
00307 logging_warn("Sent message to myself. Handling message.")
00308 Message msg;
00309 msg.encapsulate(message);
00310 handleMessage( &msg, NULL );
00311 return -1;
00312 }
00313
00314
00315 if (message->isRelayed()) {
00316 next_link = getRelayLinkTo( destination );
00317 if (next_link != NULL) {
00318 next_link->setRelaying();
00319 return bc->sendMessage(next_link->communicationId, message);
00320 } else {
00321 logging_warn("Could not send message. No relay hop found to "
00322 << destination)
00323 return -1;
00324 }
00325 }
00326
00327
00328 else {
00329
00330 LinkID next_id = overlayInterface->getNextLinkId( destination );
00331 if (next_id.isUnspecified()) {
00332 logging_warn("Could not send message. No next hop found to " <<
00333 destination );
00334 return -1;
00335 }
00336
00337
00338 next_link = getDescriptor(next_id);
00339 if (next_link != NULL && next_link->up) {
00340
00341 return send(message, next_link);
00342 }
00343
00344
00345 else {
00346 logging_warn("Could not send message. Link not known or up");
00347 return -1;
00348 }
00349 }
00350
00351
00352 return -1;
00353 }
00354
00356 seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
00357
00358 if (ldr == NULL) {
00359 logging_error("Can not send message to " << message->getDestinationAddress());
00360 return -1;
00361 }
00362
00363
00364 if (!ldr->up && !ignore_down) {
00365 logging_error("Can not send message. Link not up:" << ldr );
00366 return -1;
00367 }
00368 LinkDescriptor* ld = NULL;
00369
00370
00371 if (ldr->relayed) {
00372 logging_debug("Resolving direct link for relayed link to "
00373 << ldr->remoteNode);
00374 ld = getRelayLinkTo( ldr->remoteNode );
00375 if (ld==NULL) {
00376 logging_error("No relay path found to link " << ldr );
00377 return -1;
00378 }
00379 ld->setRelaying();
00380 message->setRelayed(true);
00381 } else
00382 ld = ldr;
00383
00384
00385 if (ld->communicationUp) {
00386 logging_debug("send(): Sending message over direct link.");
00387 return bc->sendMessage( ld->communicationId, message );
00388 } else {
00389 logging_error("send(): Could not send message. "
00390 "Not a relayed link and direct link is not up.");
00391 return -1;
00392 }
00393 return -1;
00394 }
00395
00396 seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
00397 const ServiceID& service) {
00398 message->setSourceNode(nodeId);
00399 message->setDestinationNode(remote);
00400 message->setService(service);
00401 send( message, remote );
00402 }
00403
00404 seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
00405 LinkDescriptor* ld = getDescriptor(link);
00406 if (ld==NULL) {
00407 logging_error("Cannot find descriptor to link id=" << link.toString());
00408 return -1;
00409 }
00410 message->setSourceNode(nodeId);
00411 message->setDestinationNode(ld->remoteNode);
00412
00413 message->setSourceLink(ld->overlayId);
00414 message->setDestinationLink(ld->remoteLink);
00415
00416 message->setService(ld->service);
00417 message->setRelayed(ld->relayed);
00418 return send( message, ld, ignore_down );
00419 }
00420
00421
00422
00424 void BaseOverlay::stabilizeRelays() {
00425 vector<relay_route>::iterator i = relay_routes.begin();
00426 while (i!=relay_routes.end() ) {
00427 relay_route& route = *i;
00428 LinkDescriptor* ld = getDescriptor(route.link);
00429
00430
00431 if (ld==NULL
00432 || !ld->isDirectVital()
00433 || difftime(route.used, time(NULL)) > 8) {
00434 logging_info("Forgetting relay information to node "
00435 << route.node.toString() );
00436 i = relay_routes.erase(i);
00437 } else
00438 i++;
00439 }
00440 }
00441
00442 void BaseOverlay::removeRelayLink( const LinkID& link ) {
00443 vector<relay_route>::iterator i = relay_routes.begin();
00444 while (i!=relay_routes.end() ) {
00445 relay_route& route = *i;
00446 if (route.link == link ) i = relay_routes.erase(i); else i++;
00447 }
00448 }
00449
00450 void BaseOverlay::removeRelayNode( const NodeID& remote ) {
00451 vector<relay_route>::iterator i = relay_routes.begin();
00452 while (i!=relay_routes.end() ) {
00453 relay_route& route = *i;
00454 if (route.node == remote ) i = relay_routes.erase(i); else i++;
00455 }
00456 }
00457
00459 void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
00460
00461
00462 if (ld == NULL
00463 || ld->relayed
00464 || message->getSourceNode()==nodeId ) return;
00465
00466
00467 if (message->isRelayed()) {
00468
00469 BOOST_FOREACH( relay_route& route, relay_routes ) {
00470
00471 if ( route.node == message->getDestinationNode() ) {
00472 ld->setRelaying();
00473 route.used = time(NULL);
00474 }
00475 }
00476
00477 }
00478
00479
00480 if (message->isRegisterRelay()) {
00481
00482 ld->setRelaying();
00483
00484
00485 BOOST_FOREACH( relay_route& route, relay_routes ) {
00486
00487
00488 if ( route.node == message->getSourceNode() ) {
00489
00490
00491 route.used = time(NULL);
00492 LinkDescriptor* rld = getDescriptor(route.link);
00493
00494
00495 if (route.hops > message->getNumHops()
00496 || rld == NULL
00497 || !rld->isDirectVital()) {
00498 logging_info("Updating relay information to node "
00499 << route.node.toString()
00500 << " reducing to " << message->getNumHops() << " hops.");
00501 route.hops = message->getNumHops();
00502 route.link = ld->overlayId;
00503 }
00504 return;
00505 }
00506 }
00507
00508
00509 relay_route route;
00510 route.hops = message->getNumHops();
00511 route.link = ld->overlayId;
00512 route.node = message->getSourceNode();
00513 route.used = time(NULL);
00514 logging_info("Remembering relay information to node "
00515 << route.node.toString());
00516 relay_routes.push_back(route);
00517 }
00518 }
00519
00521 LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
00522
00523 BOOST_FOREACH( relay_route& route, relay_routes ) {
00524 if (route.node == remote ) {
00525 LinkDescriptor* ld = getDescriptor( route.link );
00526 if (ld==NULL || !ld->isDirectVital()) return NULL; else {
00527 route.used = time(NULL);
00528 return ld;
00529 }
00530 }
00531 }
00532 return NULL;
00533 }
00534
00535
00536
00537
00538
00539 use_logging_cpp(BaseOverlay);
00540
00541
00542
00543 BaseOverlay::BaseOverlay() :
00544 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
00545 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
00546 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
00547 }
00548
00549 BaseOverlay::~BaseOverlay() {
00550 }
00551
00552
00553
00554 void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
00555 logging_info("Starting...");
00556
00557
00558 bc = &_basecomm;
00559 nodeId = _nodeid;
00560
00561
00562 bc->registerMessageReceiver( this );
00563 bc->registerEventListener( this );
00564
00565
00566 Timer::setInterval( 1000 );
00567 Timer::start();
00568
00569 started = true;
00570 state = BaseOverlayStateInvalid;
00571 }
00572
00573 void BaseOverlay::stop() {
00574 logging_info("Stopping...");
00575
00576
00577 Timer::stop();
00578
00579
00580 if(overlayInterface != NULL) {
00581 delete overlayInterface;
00582 overlayInterface = NULL;
00583 }
00584
00585
00586 bc->unregisterMessageReceiver( this );
00587 bc->unregisterEventListener( this );
00588
00589 started = false;
00590 state = BaseOverlayStateInvalid;
00591 }
00592
00593 bool BaseOverlay::isStarted(){
00594 return started;
00595 }
00596
00597
00598
00599 void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
00600 const EndpointDescriptor& bootstrapEp) {
00601
00602 if(id != spovnetId){
00603 logging_error("attempt to join against invalid spovnet, call initiate first");
00604 return;
00605 }
00606
00607
00608
00609 logging_info( "Starting to join spovnet " << id.toString() <<
00610 " with nodeid " << nodeId.toString());
00611
00612 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
00613
00614
00615 logging_debug("joining spovnet locally");
00616
00617 overlayInterface->joinOverlay();
00618 state = BaseOverlayStateCompleted;
00619 BOOST_FOREACH( NodeListener* i, nodeListeners )
00620 i->onJoinCompleted( spovnetId );
00621
00622
00623
00624
00625 logging_debug("starting overlay bootstrap module");
00626 overlayBootstrap.start(this, spovnetId, nodeId);
00627 overlayBootstrap.publish(bc->getEndpointDescriptor());
00628
00629 } else {
00630
00631
00632 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
00633
00634 const LinkID& lnk = bc->establishLink( bootstrapEp );
00635 bootstrapLinks.push_back(lnk);
00636 logging_info("join process initiated for " << id.toString() << "...");
00637 }
00638 }
00639
00640 void BaseOverlay::leaveSpoVNet() {
00641
00642 logging_info( "Leaving spovnet " << spovnetId );
00643 bool ret = ( state != this->BaseOverlayStateInvalid );
00644
00645 logging_debug("stopping overlay bootstrap module");
00646 overlayBootstrap.stop();
00647 overlayBootstrap.revoke();
00648
00649 logging_debug( "Dropping all auto-links" );
00650
00651
00652 vector<LinkID> servicelinks;
00653 BOOST_FOREACH( LinkDescriptor* ld, links ) {
00654 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
00655 servicelinks.push_back( ld->overlayId );
00656 }
00657
00658
00659 BOOST_FOREACH( LinkID lnk, servicelinks )
00660 dropLink( lnk );
00661
00662
00663 logging_debug( "Leaving overlay" );
00664 if( overlayInterface != NULL )
00665 overlayInterface->leaveOverlay();
00666
00667
00668 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
00669 bc->dropLink( lnk );
00670
00671
00672 state = BaseOverlayStateInvalid;
00673
00674
00675
00676 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
00677 if( ret ) i->onLeaveCompleted( spovnetId );
00678 else i->onLeaveFailed( spovnetId );
00679 }
00680 }
00681
00682 void BaseOverlay::createSpoVNet(const SpoVNetID& id,
00683 const OverlayParameterSet& param,
00684 const SecurityParameterSet& sec,
00685 const QoSParameterSet& qos) {
00686
00687
00688
00689 logging_info( "creating spovnet " + id.toString() <<
00690 " with nodeid " << nodeId.toString() );
00691
00692 spovnetId = id;
00693
00694 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
00695 if( overlayInterface == NULL ) {
00696 logging_fatal( "overlay structure not supported" );
00697 state = BaseOverlayStateInvalid;
00698
00699 BOOST_FOREACH( NodeListener* i, nodeListeners )
00700 i->onJoinFailed( spovnetId );
00701
00702 return;
00703 }
00704 }
00705
00706
00707
00708 const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
00709 const NodeID& remoteId, const ServiceID& service ) {
00710
00711
00712 if (!remoteId.isUnspecified())
00713 return establishLink( remoteId, service );
00714 else
00715
00716
00717 if (remoteId.isUnspecified())
00718 return establishDirectLink(remoteEp, service );
00719
00720 }
00721
00723 const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
00724 const ServiceID& service ) {
00725
00727 if( !communicationListeners.contains( service ) ) {
00728 logging_error( "No listener registered for service id=" << service.toString() );
00729 return LinkID::UNSPECIFIED;
00730 }
00731 CommunicationListener* listener = communicationListeners.get( service );
00732 assert( listener != NULL );
00733
00734
00735 LinkDescriptor* ld = addDescriptor();
00736 ld->relayed = false;
00737 ld->listener = listener;
00738 ld->service = service;
00739 ld->communicationId = bc->establishLink( ep );
00740
00742 logging_info("Establishing direct link " << ld->communicationId.toString()
00743 << " using " << ep.toString());
00744
00745 return ld->communicationId;
00746 }
00747
00749 const LinkID BaseOverlay::establishLink( const NodeID& remote,
00750 const ServiceID& service ) {
00751
00752
00753 if (remote == nodeId) return LinkID::UNSPECIFIED;
00754
00755
00756 LinkDescriptor* ld = addDescriptor();
00757 ld->relayed = true;
00758 ld->remoteNode = remote;
00759 ld->service = service;
00760 ld->listener = getListener(ld->service);
00761
00762
00763 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
00764 msg.setSourceLink(ld->overlayId);
00765 msg.setRelayed(true);
00766
00767
00768 logging_info(
00769 "Sending link request with"
00770 << " link=" << ld->overlayId.toString()
00771 << " node=" << ld->remoteNode.toString()
00772 << " serv=" << ld->service.toString()
00773 );
00774
00775
00776 send_node( &msg, ld->remoteNode, ld->service );
00777
00778 return ld->overlayId;
00779 }
00780
00782 void BaseOverlay::dropLink(const LinkID& link) {
00783 logging_info( "Dropping link (initiated locally):" << link.toString() );
00784
00785
00786 LinkDescriptor* ld = getDescriptor(link);
00787 if( ld == NULL ) {
00788 logging_warn( "Can't drop link, link is unknown!");
00789 return;
00790 }
00791
00792
00793 if( ld->messageQueue.size() > 0 ) {
00794 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
00795 << ld->messageQueue.size() << " waiting messages" );
00796 ld->flushQueue();
00797 }
00798
00799
00800 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
00801 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
00802
00803
00804 if (!ld->relaying) {
00805
00806 if (ld->communicationUp) bc->dropLink( ld->communicationId );
00807
00808
00809 eraseDescriptor( ld->overlayId );
00810 } else {
00811 ld->dropAfterRelaying = true;
00812 }
00813 }
00814
00815
00816
00818 seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
00819 logging_debug( "Sending data message on link " << link.toString() );
00820
00821
00822 LinkDescriptor* ld = getDescriptor(link);
00823 if( ld == NULL ) {
00824 logging_error("Could not send message. "
00825 << "Link not found id=" << link.toString());
00826 return -1;
00827 }
00828
00829
00830 if( !ld->up ) {
00831 ld->setAutoUsed();
00832 if( ld->autolink ) {
00833 logging_info("Auto-link " << link.toString() << " not up, queue message");
00834 Data data = data_serialize( message );
00835 const_cast<Message*>(message)->dropPayload();
00836 ld->messageQueue.push_back( new Message(data) );
00837 } else {
00838 logging_error("Link " << link.toString() << " not up, drop message");
00839 }
00840 return -1;
00841 }
00842
00843
00844 OverlayMsg overmsg( OverlayMsg::typeData );
00845 overmsg.encapsulate( const_cast<Message*>(message) );
00846
00847
00848 return send_link( &overmsg, ld->overlayId );
00849 }
00850
00851 seqnum_t BaseOverlay::sendMessage(const Message* message,
00852 const NodeID& node, const ServiceID& service) {
00853
00854
00855 LinkDescriptor* ld = getAutoDescriptor( node, service );
00856
00857
00858 if( ld == NULL ) {
00859
00860
00861 logging_info( "No link to send message to node "
00862 << node.toString() << " found for service "
00863 << service.toString() << ". Creating auto link ..."
00864 );
00865
00866
00867 LinkID link = establishLink( node, service );
00868 ld = getDescriptor( link );
00869 if( ld == NULL ) {
00870 logging_error( "Failed to establish auto-link.");
00871 return -1;
00872 }
00873 ld->autolink = true;
00874
00875 logging_debug( "Auto-link establishment in progress to node "
00876 << node.toString() << " with link id=" << link.toString() );
00877 }
00878 assert(ld != NULL);
00879
00880
00881 ld->setAutoUsed();
00882
00883
00884 return sendMessage( message, ld->overlayId );
00885 }
00886
00887
00888
00889 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
00890 const LinkID& link) const {
00891
00892
00893 if( link == LinkID::UNSPECIFIED )
00894 return bc->getEndpointDescriptor();
00895
00896
00897 const LinkDescriptor* ld = getDescriptor(link);
00898 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
00899
00900
00901 return bc->getEndpointDescriptor( ld->communicationId );
00902 }
00903
00904 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
00905 const NodeID& node) const {
00906
00907
00908 if( node == nodeId || node == NodeID::UNSPECIFIED )
00909 return bc->getEndpointDescriptor();
00910
00911
00912 if( overlayInterface == NULL ) {
00913 logging_error( "overlay interface not set, cannot resolve endpoint" );
00914 return EndpointDescriptor::UNSPECIFIED();
00915 }
00916
00917
00918 return overlayInterface->resolveNode( node );
00919 }
00920
00921
00922
00923 bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
00924 sideport = _sideport;
00925 _sideport->configure( this );
00926 }
00927
00928 bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
00929 sideport = &SideportListener::DEFAULT;
00930 }
00931
00932
00933
00934 bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
00935 logging_debug( "binding communication listener " << listener
00936 << " on serviceid " << sid.toString() );
00937
00938 if( communicationListeners.contains( sid ) ) {
00939 logging_error( "some listener already registered for service id "
00940 << sid.toString() );
00941 return false;
00942 }
00943
00944 communicationListeners.registerItem( listener, sid );
00945 return true;
00946 }
00947
00948
00949 bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
00950 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
00951
00952 if( !communicationListeners.contains( sid ) ) {
00953 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
00954 return false;
00955 }
00956
00957 if( communicationListeners.get(sid) != listener ) {
00958 logging_warn( "listener bound to service id " << sid.toString()
00959 << " is different than listener trying to unbind" );
00960 return false;
00961 }
00962
00963 communicationListeners.unregisterItem( sid );
00964 return true;
00965 }
00966
00967
00968
00969 bool BaseOverlay::bind(NodeListener* listener) {
00970 logging_debug( "Binding node listener " << listener );
00971
00972
00973 NodeListenerVector::iterator i =
00974 find( nodeListeners.begin(), nodeListeners.end(), listener );
00975 if( i != nodeListeners.end() ) {
00976 logging_warn("Node listener " << listener << " is already bound!" );
00977 return false;
00978 }
00979
00980
00981 nodeListeners.push_back( listener );
00982 return true;
00983 }
00984
00985 bool BaseOverlay::unbind(NodeListener* listener) {
00986 logging_debug( "Unbinding node listener " << listener );
00987
00988
00989 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
00990 if( i == nodeListeners.end() ) {
00991 logging_warn( "Node listener " << listener << " is not bound!" );
00992 return false;
00993 }
00994
00995
00996 nodeListeners.erase( i );
00997 return true;
00998 }
00999
01000
01001
01002 void BaseOverlay::onLinkUp(const LinkID& id,
01003 const address_v* local, const address_v* remote) {
01004 logging_debug( "Link up with base communication link id=" << id );
01005
01006
01007 LinkDescriptor* ld = getDescriptor(id, true);
01008
01009
01010 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
01011 logging_info(
01012 "Join has been initiated by me and the link is now up. " <<
01013 "Sending out join request for SpoVNet " << spovnetId.toString()
01014 );
01015
01016
01017 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
01018 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
01019 JoinRequest joinRequest( spovnetId, nodeId );
01020 overlayMsg.encapsulate( &joinRequest );
01021 bc->sendMessage( id, &overlayMsg );
01022 return;
01023 }
01024
01025
01026 if (ld == NULL) {
01027 ld = addDescriptor( id );
01028 logging_info( "onLinkUp (remote request) descriptor: " << ld );
01029
01030
01031 ld->fromRemote = true;
01032 ld->communicationId = id;
01033 ld->communicationUp = true;
01034 ld->setAutoUsed();
01035 ld->setAlive();
01036
01037
01038
01039
01040
01041 } else {
01042 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
01043
01044
01045 ld->setAutoUsed();
01046 ld->setAlive();
01047 ld->communicationUp = true;
01048 ld->fromRemote = false;
01049
01050
01051 if (ld->relayed) {
01052 logging_info( "Converting to direct link: " << ld );
01053 ld->up = true;
01054 ld->relayed = false;
01055 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
01056 overMsg.setSourceLink( ld->overlayId );
01057 overMsg.setDestinationLink( ld->remoteLink );
01058 send_link( &overMsg, ld->overlayId );
01059 } else {
01060
01061 logging_info( "Sending out update" <<
01062 " for service " << ld->service.toString() <<
01063 " with local node id " << nodeId.toString() <<
01064 " on link " << ld->overlayId.toString() );
01065
01066
01067 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
01068 overlayMsg.setSourceLink(ld->overlayId);
01069 overlayMsg.setAutoLink( ld->autolink );
01070 send_link( &overlayMsg, ld->overlayId, true );
01071 }
01072 }
01073 }
01074
01075 void BaseOverlay::onLinkDown(const LinkID& id,
01076 const address_v* local, const address_v* remote) {
01077
01078
01079 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
01080 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
01081
01082
01083 LinkDescriptor* ld = getDescriptor(id, true);
01084 if ( ld == NULL ) return;
01085 logging_info( "onLinkDown descriptor: " << ld );
01086
01087
01088 removeRelayLink(ld->overlayId);
01089
01090
01091 ld->communicationUp = false;
01092 if (!ld->service.isUnspecified()) {
01093 getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode );
01094 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
01095 }
01096
01097
01098 if( ld->messageQueue.size() > 0 ) {
01099 logging_warn( "Dropping link " << id.toString() << " that has "
01100 << ld->messageQueue.size() << " waiting messages" );
01101 ld->flushQueue();
01102 }
01103
01104
01105 eraseDescriptor(ld->overlayId);
01106 }
01107
01108 void BaseOverlay::onLinkChanged(const LinkID& id,
01109 const address_v* oldlocal, const address_v* newlocal,
01110 const address_v* oldremote, const address_v* newremote) {
01111
01112
01113 LinkDescriptor* ld = getDescriptor(id, true);
01114 if ( ld == NULL ) return;
01115 logging_debug( "onLinkChanged descriptor: " << ld );
01116
01117
01118 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
01119 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
01120
01121
01122 ld->setAutoUsed();
01123 }
01124
01125 void BaseOverlay::onLinkFail(const LinkID& id,
01126 const address_v* local, const address_v* remote) {
01127 logging_debug( "Link fail with base communication link id=" << id );
01128
01129
01130 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
01131 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
01132
01133
01134 LinkDescriptor* ld = getDescriptor(id, true);
01135 if ( ld == NULL ) return;
01136 logging_debug( "Link failed id=" << ld->overlayId.toString() );
01137
01138
01139 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
01140 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
01141 }
01142
01143 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
01144 const address_v* remote, const QoSParameterSet& qos) {
01145 logging_debug( "Link quality changed with base communication link id=" << id );
01146
01147
01148 LinkDescriptor* ld = getDescriptor(id, true);
01149 if ( ld == NULL ) return;
01150 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
01151 }
01152
01153 bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
01154 const address_v* remote ) {
01155 logging_debug("Accepting link request from " << remote->to_string() );
01156 return true;
01157 }
01158
01160 bool BaseOverlay::receiveMessage(const Message* message,
01161 const LinkID& link, const NodeID& ) {
01162
01163 LinkDescriptor* ld = getDescriptor( link, true );
01164 return handleMessage( message, ld, link );
01165 }
01166
01167
01168
01170 bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
01171
01172
01173 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
01174 logging_info( "Received join request for spovnet " <<
01175 joinReq->getSpoVNetID().toString() );
01176
01177
01178 if( joinReq->getSpoVNetID() != spovnetId ) {
01179 logging_error(
01180 "Received join request for spovnet we don't handle " <<
01181 joinReq->getSpoVNetID().toString() );
01182 return false;
01183 }
01184
01185
01186 bool allow = true;
01187 logging_info( "Sending join reply for spovnet " <<
01188 spovnetId.toString() << " to node " <<
01189 overlayMsg->getSourceNode().toString() <<
01190 ". Result: " << (allow ? "allowed" : "denied") );
01191 joiningNodes.push_back( overlayMsg->getSourceNode() );
01192
01193
01194 assert( overlayInterface != NULL );
01195 logging_debug( "Using bootstrap end-point "
01196 << getEndpointDescriptor().toString() )
01197 OverlayParameterSet parameters = overlayInterface->getParameters();
01198 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
01199 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
01200 JoinReply replyMsg( spovnetId, parameters,
01201 allow, getEndpointDescriptor() );
01202 retmsg.encapsulate(&replyMsg);
01203 bc->sendMessage( bcLink, &retmsg );
01204
01205 return true;
01206 }
01207
01209 bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
01210
01211 logging_debug("received join reply message");
01212 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
01213
01214
01215 if( replyMsg->getSpoVNetID() != spovnetId ) {
01216 logging_error( "Received SpoVNet join reply for " <<
01217 replyMsg->getSpoVNetID().toString() <<
01218 " != " << spovnetId.toString() );
01219 delete replyMsg;
01220 return false;
01221 }
01222
01223
01224 if( !replyMsg->getJoinAllowed() ) {
01225 logging_error( "Our join request has been denied" );
01226
01227
01228 if( !bcLink.isUnspecified() ){
01229 bc->dropLink( bcLink );
01230
01231 vector<LinkID>::iterator it = std::find(
01232 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
01233 if( it != bootstrapLinks.end() )
01234 bootstrapLinks.erase(it);
01235 }
01236
01237
01238 BOOST_FOREACH( NodeListener* i, nodeListeners )
01239 i->onJoinFailed( spovnetId );
01240
01241 delete replyMsg;
01242 return true;
01243 }
01244
01245
01246 logging_info("Join request has been accepted for spovnet " <<
01247 spovnetId.toString() );
01248
01249 logging_debug( "Using bootstrap end-point "
01250 << replyMsg->getBootstrapEndpoint().toString() );
01251
01252
01253
01254 if( overlayInterface == NULL ){
01255
01256 logging_debug("first-time bootstrapping");
01257
01258 overlayInterface = OverlayFactory::create(
01259 *this, replyMsg->getParam(), nodeId, this );
01260
01261
01262 if( overlayInterface == NULL ) {
01263 logging_error( "overlay structure not supported" );
01264
01265 if( !bcLink.isUnspecified() ){
01266 bc->dropLink( bcLink );
01267
01268 vector<LinkID>::iterator it = std::find(
01269 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
01270 if( it != bootstrapLinks.end() )
01271 bootstrapLinks.erase(it);
01272 }
01273
01274
01275 BOOST_FOREACH( NodeListener* i, nodeListeners )
01276 i->onJoinFailed( spovnetId );
01277
01278 delete replyMsg;
01279 return true;
01280 }
01281
01282
01283 state = BaseOverlayStateCompleted;
01284 overlayInterface->createOverlay();
01285
01286 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
01287 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
01288
01289
01290
01291
01292
01293 BOOST_FOREACH( NodeListener* i, nodeListeners )
01294 i->onJoinCompleted( spovnetId );
01295
01296 delete replyMsg;
01297
01298 } else {
01299
01300
01301 logging_debug("not first-time bootstrapping");
01302 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
01303 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
01304
01305 delete replyMsg;
01306
01307 }
01308
01309 return true;
01310 }
01311
01312
01313 bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01314
01315 const ServiceID& service = overlayMsg->getService();
01316 logging_debug( "Received data for service " << service.toString()
01317 << " on link " << overlayMsg->getDestinationLink().toString() );
01318
01319
01320 getListener(service)->onMessage(
01321 overlayMsg,
01322 overlayMsg->getSourceNode(),
01323 overlayMsg->getDestinationLink()
01324 );
01325
01326 return true;
01327 }
01328
01329
01330 bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01331
01332 if( ld == NULL ) {
01333 logging_warn( "received overlay update message for link for "
01334 << "which we have no mapping" );
01335 return false;
01336 }
01337 logging_info("Received type update message on link " << ld );
01338
01339
01340 bool changed =
01341 ( ld->remoteNode != overlayMsg->getSourceNode() )
01342 || ( ld->service != overlayMsg->getService() );
01343
01344
01345 ld->up = true;
01346 ld->remoteNode = overlayMsg->getSourceNode();
01347 ld->remoteLink = overlayMsg->getSourceLink();
01348 ld->service = overlayMsg->getService();
01349 ld->autolink = overlayMsg->isAutoLink();
01350
01351
01352 if( changed ) {
01353 overlayMsg->swapRoles();
01354 overlayMsg->setSourceNode(nodeId);
01355 overlayMsg->setSourceLink(ld->overlayId);
01356 overlayMsg->setService(ld->service);
01357 send( overlayMsg, ld );
01358 }
01359
01360
01361 if( !communicationListeners.contains( ld->service ) ) {
01362 logging_warn( "Link up: event listener has not been registered" );
01363 return false;
01364 }
01365
01366
01367 CommunicationListener* listener = communicationListeners.get( ld->service );
01368 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
01369 logging_warn("Link up: event listener is default or null!" );
01370 return true;
01371 }
01372
01373
01374 ld->listener = listener;
01375 ld->setAutoUsed();
01376 ld->setAlive();
01377
01378
01379 if( !listener->onLinkRequest(ld->remoteNode) ) {
01380
01381 logging_debug("Link id=" << ld->overlayId.toString() <<
01382 " has been denied by service " << ld->service.toString() << ", dropping link");
01383
01384
01385 ld->listener = &CommunicationListener::DEFAULT;
01386
01387
01388 dropLink( ld->overlayId );
01389 return true;
01390 }
01391
01392
01393 ld->up = true;
01394 logging_info( "Link has been accepted by service and is up: " << ld );
01395
01396
01397 if( ld->messageQueue.size() > 0 ) {
01398 logging_info( "Sending out queued messages on link " << ld );
01399 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
01400 sendMessage( msg, ld->overlayId );
01401 delete msg;
01402 }
01403 ld->messageQueue.clear();
01404 }
01405
01406
01407 listener->onLinkUp( ld->overlayId, ld->remoteNode );
01408 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
01409
01410 return true;
01411 }
01412
01414 bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01415 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
01416
01417
01418
01419
01420 LinkDescriptor* ldn = addDescriptor();
01421
01422
01423 ldn->up = true;
01424 ldn->fromRemote = true;
01425 ldn->relayed = true;
01426
01427
01428 ldn->service = overlayMsg->getService();
01429 ldn->listener = getListener(ldn->service);
01430 ldn->remoteNode = overlayMsg->getSourceNode();
01431 ldn->remoteLink = overlayMsg->getSourceLink();
01432
01433
01434 ldn->setAlive();
01435 ldn->setAutoUsed();
01436
01437
01438 overlayMsg->swapRoles();
01439 overlayMsg->setType(OverlayMsg::typeLinkReply);
01440 overlayMsg->setSourceLink(ldn->overlayId);
01441 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
01442 overlayMsg->setRelayed(true);
01443 send( overlayMsg, ld );
01444
01445
01446 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
01447
01448 return true;
01449 }
01450
01451 bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01452
01453
01454 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
01455
01456
01457 if (ldn == NULL) {
01458 logging_error( "No link request pending for "
01459 << overlayMsg->getDestinationLink().toString() );
01460 return false;
01461 }
01462 logging_debug("Handling link reply for " << ldn )
01463
01464
01465 if (ldn->up) {
01466 logging_warn( "Link already up: " << ldn );
01467 return true;
01468 }
01469
01470
01471 logging_debug( "Link request reply received. Establishing link"
01472 << " for service " << overlayMsg->getService().toString()
01473 << " with local id=" << overlayMsg->getDestinationLink()
01474 << " and remote link id=" << overlayMsg->getSourceLink()
01475 << " to " << overlayMsg->getSourceEndpoint().toString()
01476 );
01477
01478
01479 ldn->up = true;
01480 ldn->relayed = true;
01481 ldn->service = overlayMsg->getService();
01482 ldn->listener = getListener(ldn->service);
01483 ldn->remoteLink = overlayMsg->getSourceLink();
01484 ldn->remoteNode = overlayMsg->getSourceNode();
01485
01486
01487 ldn->setAlive();
01488 ldn->setAutoUsed();
01489
01490
01491 if( ldn->messageQueue.size() > 0 ) {
01492 logging_info( "Sending out queued messages on link " <<
01493 ldn->overlayId.toString() );
01494 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
01495 sendMessage( msg, ldn->overlayId );
01496 delete msg;
01497 }
01498 ldn->messageQueue.clear();
01499 }
01500
01501
01502 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
01503
01504
01505 ldn->communicationId =
01506 bc->establishLink( overlayMsg->getSourceEndpoint() );
01507
01508 return true;
01509 }
01510
01512 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01513 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
01514 if ( rld != NULL ) {
01515 logging_debug("Keep-Alive for " <<
01516 overlayMsg->getDestinationLink() );
01517 if (overlayMsg->isRouteRecord())
01518 rld->routeRecord = overlayMsg->getRouteRecord();
01519 rld->setAlive();
01520 return true;
01521 } else {
01522 logging_error("Keep-Alive for "
01523 << overlayMsg->getDestinationLink() << ": link unknown." );
01524 return false;
01525 }
01526 }
01527
01529 bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
01530 logging_debug( "Received direct link replacement request" );
01531
01533 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
01534 if (rld == NULL || ld == NULL) {
01535 logging_error("Direct link replacement: Link "
01536 << overlayMsg->getDestinationLink() << "not found error." );
01537 return false;
01538 }
01539 logging_info( "Received direct link convert notification for " << rld );
01540
01541
01542 rld->communicationId = ld->communicationId;
01543 rld->communicationUp = true;
01544 rld->relayed = false;
01545
01546
01547 rld->setAlive();
01548 rld->setAutoUsed();
01549
01550
01551 eraseDescriptor(ld->overlayId);
01552 }
01553
01555 bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
01556 const LinkID bcLink ) {
01557 logging_debug( "Handling message: " << message->toString());
01558
01559
01560 OverlayMsg* overlayMsg =
01561 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
01562 if( overlayMsg == NULL ) return false;
01563
01564
01565 overlayMsg->increaseNumHops();
01566
01567
01568 refreshRelayInformation( overlayMsg, ld );
01569
01570
01571 overlayMsg->addRouteRecord(nodeId);
01572
01573
01574 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
01575 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
01576 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
01577 delete overlayMsg;
01578 return true;
01579 }
01580
01581
01582 if (!overlayMsg->getDestinationNode().isUnspecified() &&
01583 overlayMsg->getDestinationNode() != nodeId ) {
01584 logging_debug("Routing message "
01585 << " from " << overlayMsg->getSourceNode()
01586 << " to " << overlayMsg->getDestinationNode()
01587 );
01588 route( overlayMsg );
01589 delete overlayMsg;
01590 return true;
01591 }
01592
01593
01594 bool ret = false;
01595 switch ( overlayMsg->getType() ) {
01596
01597
01598 case OverlayMsg::typeData:
01599 ret = handleData(overlayMsg, ld); break;
01600
01601
01602 case OverlayMsg::typeJoinRequest:
01603 ret = handleJoinRequest(overlayMsg, bcLink ); break;
01604 case OverlayMsg::typeJoinReply:
01605 ret = handleJoinReply(overlayMsg, bcLink ); break;
01606
01607
01608 case OverlayMsg::typeLinkRequest:
01609 ret = handleLinkRequest(overlayMsg, ld ); break;
01610 case OverlayMsg::typeLinkReply:
01611 ret = handleLinkReply(overlayMsg, ld ); break;
01612 case OverlayMsg::typeLinkUpdate:
01613 ret = handleLinkUpdate(overlayMsg, ld ); break;
01614 case OverlayMsg::typeLinkAlive:
01615 ret = handleLinkAlive(overlayMsg, ld ); break;
01616 case OverlayMsg::typeLinkDirect:
01617 ret = handleLinkDirect(overlayMsg, ld ); break;
01618
01619
01620 default: {
01621 logging_error( "received message in invalid state! don't know " <<
01622 "what to do with this message of type " << overlayMsg->getType() );
01623 ret = false;
01624 break;
01625 }
01626 }
01627
01628
01629 delete overlayMsg;
01630 return ret;
01631 }
01632
01633
01634
01635 void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
01636
01637 logging_debug( "broadcasting message to all known nodes " <<
01638 "in the overlay from service " + service.toString() );
01639
01640 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
01641 OverlayInterface::NodeList::iterator i = nodes.begin();
01642 for(; i != nodes.end(); i++ ) {
01643 if( *i == nodeId) continue;
01644 sendMessage( message, *i, service );
01645 }
01646 }
01647
01649 vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
01650
01651 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
01652 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
01653 if( i != nodes.end() ) nodes.erase( i );
01654 return nodes;
01655 }
01656
01657 const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
01658 if( lid == LinkID::UNSPECIFIED ) return nodeId;
01659 const LinkDescriptor* ld = getDescriptor(lid);
01660 if( ld == NULL ) return NodeID::UNSPECIFIED;
01661 else return ld->remoteNode;
01662 }
01663
01664 vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
01665 vector<LinkID> linkvector;
01666 BOOST_FOREACH( LinkDescriptor* ld, links ) {
01667 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
01668 linkvector.push_back( ld->overlayId );
01669 }
01670 }
01671 return linkvector;
01672 }
01673
01674
01675 void BaseOverlay::onNodeJoin(const NodeID& node) {
01676 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
01677 if( i == joiningNodes.end() ) return;
01678
01679 logging_info( "node has successfully joined baseoverlay and overlay structure "
01680 << node.toString() );
01681
01682 joiningNodes.erase( i );
01683 }
01684
01685 void BaseOverlay::eventFunction() {
01686 stabilizeRelays();
01687 stabilizeLinks();
01688 }
01689
01690 }}