source: source/ariba/overlay/BaseOverlay.cpp@ 5403

Last change on this file since 5403 was 5316, checked in by Christoph Mayer, 15 years ago

merge from bootstrap branch

File size: 48.6 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51#include "ariba/overlay/messages/OverlayMsg.h"
52#include "ariba/overlay/messages/JoinRequest.h"
53#include "ariba/overlay/messages/JoinReply.h"
54#include "ariba/overlay/messages/LinkRequest.h"
55#include "ariba/overlay/messages/RelayMessage.h"
56
57#include "ariba/utility/misc/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62#define logging_force(x) std::cout << x << std::endl;
63#define logging_force1(x) std::cout << x << std::endl;
64
65LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
66 BOOST_FOREACH( LinkDescriptor* lp, links )
67 if ((communication ? lp->communicationId : lp->overlayId) == link)
68 return lp;
69 return NULL;
70}
71
72const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
73 BOOST_FOREACH( const LinkDescriptor* lp, links )
74 if ((communication ? lp->communicationId : lp->overlayId) == link)
75 return lp;
76 return NULL;
77}
78
79LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
80 BOOST_FOREACH( LinkDescriptor* lp, links )
81 if (lp->autolink && lp->remoteNode == node && lp->service == service)
82 return lp;
83 return NULL;
84}
85
86void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
87 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
88 LinkDescriptor* ld = *i;
89 if ((communication ? ld->communicationId : ld->overlayId) == link) {
90 delete ld;
91 links.erase(i);
92 break;
93 }
94 }
95}
96
97LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
98 LinkDescriptor* desc = getDescriptor( link );
99 if ( desc == NULL ) {
100 desc = new LinkDescriptor();
101 desc->overlayId = link;
102 links.push_back(desc);
103 }
104 return desc;
105}
106
107/// returns a direct link relay descriptor to the given relay node
108LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& relayNode ) {
109 BOOST_FOREACH( LinkDescriptor* lp, links )
110 if (lp->remoteNode == relayNode &&
111 lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&
112 lp->relay == false &&
113 lp->up)
114 return lp;
115 return NULL;
116}
117
118/// find a proper relay node
119const NodeID BaseOverlay::findRelayNode( const NodeID& id ) {
120 LinkDescriptor* rld = NULL;
121 NodeID relayNode = NodeID::UNSPECIFIED;
122
123 // get used next hop towards node
124 LinkID rlid = overlayInterface->getNextLinkId(id);
125 while ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {
126
127 // get descriptor of first hop
128 rld = getDescriptor(rlid);
129 logging_force1( rld );
130
131 // is first hop a relay path? yes-> try to find real link!
132 if ( rld->relay )
133 relayNode = getRelayDescriptor(rld->localRelay)->remoteNode;
134
135 // no-> a proper relay node has been found
136 else relayNode = rld->remoteNode;
137 }
138 logging_force1( "Potential relay node " << relayNode.toString() );
139 // do not return myself or use the node as relay node
140 if (relayNode == nodeId)
141 return NodeID::UNSPECIFIED;
142 else {
143 logging_force1( "Returning relay node " << relayNode.toString() );
144 return relayNode;
145 }
146}
147
148/// forwards a message over relays/overlay/directly using link descriptor
149seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) {
150
151 // directly send message
152 if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) {
153 logging_debug("sendMessage: Sending message via Base Communication");
154 return bc->sendMessage( ld->communicationId, message );
155 }
156
157 // relay message
158 else if ( ld->relay ) {
159 logging_debug("sendMessage: Relaying message to node "
160 << ld->remoteNode.toString()
161 << " using relay " << ld->localRelay
162 );
163
164 // get local relay link descriptor and mark as used for relaying
165 LinkDescriptor* rld = getRelayDescriptor(ld->localRelay);
166 if (rld==NULL) {
167 logging_error("sendMessage: Relay descriptor for relay " <<
168 ld->localRelay.toString() << " unknown.");
169 return -1;
170 }
171 rld->markAsRelay();
172
173 // create a information relay message to inform the relay about
174 OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId );
175 RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId );
176 relayMsg.encapsulate( message );
177 overlay_msg.encapsulate( &relayMsg );
178
179 // route message to relay node in order to inform it!
180 logging_debug("sendMessage: Sending message over relayed link with" << ld );
181 overlayInterface->routeMessage(rld->remoteNode, rld->overlayId, &overlay_msg);
182 return 0;
183 }
184
185 // route message using overlay
186 else {
187 logging_error("Could not send message descriptor=" << ld );
188 logging_debug( "sendMessage: Routing message to node " << ld->remoteNode.toString() );
189 overlayInterface->routeMessage( ld->remoteNode, message );
190 return 0;
191 }
192
193 return -1;
194}
195
196/// creates a link descriptor, apply relay semantics if possible
197LinkDescriptor* BaseOverlay::createLinkDescriptor(
198 const NodeID& remoteNode, const ServiceID& service, const LinkID& link_id ) {
199
200 // find listener
201 if( !communicationListeners.contains( service ) ) {
202 logging_error( "No listener found for service " << service.toString() );
203 return NULL;
204 }
205 CommunicationListener* listener = communicationListeners.get( service );
206 assert( listener != NULL );
207
208 // copy link id
209 LinkID linkid = link_id;
210
211 // create link id if necessary
212 if ( linkid.isUnspecified() )
213 linkid = LinkID::create();
214
215 // create relay link descriptor
216 NodeID relayNode = findRelayNode(remoteNode);
217
218 // add descriptor
219 LinkDescriptor* ld = addDescriptor( linkid );
220 ld->overlayId = linkid;
221 ld->service = service;
222 ld->listener = listener;
223 ld->remoteNode = remoteNode;
224
225 // set relay node if available
226 ld->relay = !relayNode.isUnspecified();
227 ld->localRelay = relayNode;
228
229 if (!ld->relay)
230 logging_error("No relay found!");
231
232 // debug output
233 logging_debug( "Created link descriptor: " << ld );
234
235 return ld;
236}
237
238
239// ----------------------------------------------------------------------------
240
241use_logging_cpp(BaseOverlay);
242
243// ----------------------------------------------------------------------------
244
245BaseOverlay::BaseOverlay() :
246 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
247 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
248 sideport(&SideportListener::DEFAULT), started(false) {
249}
250
251BaseOverlay::~BaseOverlay() {
252}
253
254// ----------------------------------------------------------------------------
255
256void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
257 logging_info("Starting...");
258
259 // set parameters
260 bc = &_basecomm;
261 nodeId = _nodeid;
262
263 // register at base communication
264 bc->registerMessageReceiver( this );
265 bc->registerEventListener( this );
266
267 // timer for auto link management
268 Timer::setInterval( 500 );
269 Timer::start();
270
271 started = true;
272 state = BaseOverlayStateInvalid;
273}
274
275void BaseOverlay::stop() {
276 logging_info("Stopping...");
277
278 // stop timer
279 Timer::stop();
280
281 // delete oberlay interface
282 if(overlayInterface != NULL) {
283 delete overlayInterface;
284 overlayInterface = NULL;
285 }
286
287 // unregister at base communication
288 bc->unregisterMessageReceiver( this );
289 bc->unregisterEventListener( this );
290
291 started = false;
292 state = BaseOverlayStateInvalid;
293}
294
295bool BaseOverlay::isStarted(){
296 return started;
297}
298
299// ----------------------------------------------------------------------------
300
301void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
302 const EndpointDescriptor& bootstrapEp) {
303
304 if(id != spovnetId){
305 logging_error("attempt to join against invalid spovnet, call initiate first");
306 return;
307 }
308
309
310 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
311 logging_info( "Starting to join spovnet " << id.toString() <<
312 " with nodeid " << nodeId.toString());
313
314 if(bootstrapEp == EndpointDescriptor::UNSPECIFIED && state == BaseOverlayStateInvalid){
315
316 // bootstrap against ourselfs
317 logging_debug("joining spovnet locally");
318
319 overlayInterface->joinOverlay();
320 state = BaseOverlayStateCompleted;
321 BOOST_FOREACH( NodeListener* i, nodeListeners )
322 i->onJoinCompleted( spovnetId );
323
324 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
325 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
326
327 logging_debug("starting overlay bootstrap module");
328 overlayBootstrap.start(this, spovnetId, nodeId);
329 overlayBootstrap.publish(bc->getEndpointDescriptor());
330
331 } else {
332
333 // bootstrap against another node
334 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
335
336 const LinkID& lnk = bc->establishLink( bootstrapEp );
337 bootstrapLinks.push_back(lnk);
338 logging_info("join process initiated for " << id.toString() << "...");
339 }
340}
341
342void BaseOverlay::leaveSpoVNet() {
343
344 logging_info( "Leaving spovnet " << spovnetId );
345 bool ret = ( state != this->BaseOverlayStateInvalid );
346
347 logging_debug("stopping overlay bootstrap module");
348 overlayBootstrap.stop();
349 overlayBootstrap.revoke();
350
351 logging_debug( "Dropping all auto-links" );
352
353 // gather all service links
354 vector<LinkID> servicelinks;
355 BOOST_FOREACH( LinkDescriptor* ld, links ) {
356 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
357 servicelinks.push_back( ld->overlayId );
358 }
359
360 // drop all service links
361 BOOST_FOREACH( LinkID lnk, servicelinks )
362 dropLink( lnk );
363
364 // let the node leave the spovnet overlay interface
365 logging_debug( "Leaving overlay" );
366 if( overlayInterface != NULL )
367 overlayInterface->leaveOverlay();
368
369 // drop still open bootstrap links
370 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
371 bc->dropLink( lnk );
372
373 // change to inalid state
374 state = BaseOverlayStateInvalid;
375 //ovl.visShutdown( ovlId, nodeId, string("") );
376
377 // inform all registered services of the event
378 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
379 if( ret ) i->onLeaveCompleted( spovnetId );
380 else i->onLeaveFailed( spovnetId );
381 }
382}
383
384void BaseOverlay::createSpoVNet(const SpoVNetID& id,
385 const OverlayParameterSet& param,
386 const SecurityParameterSet& sec,
387 const QoSParameterSet& qos) {
388
389 // set the state that we are an initiator, this way incoming messages are
390 // handled correctly
391 logging_info( "creating spovnet " + id.toString() <<
392 " with nodeid " << nodeId.toString() );
393
394 spovnetId = id;
395
396 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
397 if( overlayInterface == NULL ) {
398 logging_fatal( "overlay structure not supported" );
399 state = BaseOverlayStateInvalid;
400
401 BOOST_FOREACH( NodeListener* i, nodeListeners )
402 i->onJoinFailed( spovnetId );
403
404 return;
405 }
406}
407
408// ----------------------------------------------------------------------------
409
410const LinkID BaseOverlay::establishLink(
411 const EndpointDescriptor& ep, const NodeID& nodeid,
412 const ServiceID& service, const LinkID& linkid ) {
413
414 LinkID link_id = linkid;
415
416 // establish link via overlay
417 if (!nodeid.isUnspecified())
418 link_id = establishLink( nodeid, service, link_id );
419
420 // establish link directly if only ep is known
421 if (nodeid.isUnspecified())
422 establishLink( ep, service, link_id );
423
424 return link_id;
425}
426
427/// call base communication's establish link and add link mapping
428const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep,
429 const ServiceID& service, const LinkID& linkid ) {
430
431 // create a new link id if necessary
432 LinkID link_id = linkid;
433 if (link_id.isUnspecified()) link_id = LinkID::create();
434
435 /// find a service listener
436 if( !communicationListeners.contains( service ) ) {
437 logging_error( "No listener registered for service id=" << service.toString() );
438 return LinkID::UNSPECIFIED;
439 }
440 CommunicationListener* listener = communicationListeners.get( service );
441 assert( listener != NULL );
442
443 /// establish link and add mapping
444 logging_info("Establishing direct link " << link_id.toString()
445 << " using " << ep.toString());
446
447 // create descriptor
448 LinkDescriptor* ld = addDescriptor( link_id );
449 ld->overlayId = link_id;
450 ld->communicationId = link_id;
451 ld->listener = listener;
452 ld->service = service;
453 bc->establishLink( ep, link_id );
454
455 return link_id;
456}
457
458/// establishes a link between two arbitrary nodes
459const LinkID BaseOverlay::establishLink( const NodeID& node,
460 const ServiceID& service, const LinkID& link_id ) {
461
462 // do not establish a link to myself!
463 if (node == nodeId) return LinkID::UNSPECIFIED;
464
465 // create a link descriptor
466 LinkDescriptor* ld = createLinkDescriptor( node, service, link_id );
467
468 // create link request message with own link id
469 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
470 LinkRequest link_request_msg(
471 nonce, &bc->getEndpointDescriptor(), false,
472 ld->overlayId, ld->localRelay );
473 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
474 overlay_msg.encapsulate( &link_request_msg );
475 pendingLinks.insert( make_pair(nonce, ld->overlayId) );
476
477 // debug message
478 logging_debug(
479 "Sending link request with"
480 << " link id=" << ld->overlayId
481 << " node id=" << ld->remoteNode.toString()
482 << " service id=" << ld->service.toString()
483 << " local relay id=" << ld->localRelay.toString()
484 << " nonce= " << nonce
485 );
486
487 // sending message through new link
488 sendMessage( &overlay_msg, ld );
489
490 return ld->overlayId;
491}
492
493/// drops an established link
494void BaseOverlay::dropLink(const LinkID& link) {
495 logging_debug( "Dropping link (initiated locally):" << link.toString() );
496
497 // find the link item to drop
498 LinkDescriptor* ld = getDescriptor(link);
499 if( ld == NULL ) {
500 logging_warn( "Can't drop link, link is unknown!");
501 return;
502 }
503
504 // delete all queued messages
505 if( ld->messageQueue.size() > 0 ) {
506 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
507 << ld->messageQueue.size() << " waiting messages" );
508 ld->flushQueue();
509 }
510
511 // inform sideport and listener
512 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
513 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
514
515 // do not drop relay links
516 if (!ld->usedAsRelay) {
517 // drop the link in base communication
518 if (ld->communicationUp) bc->dropLink( ld->communicationId );
519
520 // erase descriptor
521 eraseDescriptor( ld->overlayId );
522 } else
523 ld->dropWhenRelaysLeft = true;
524}
525
526// ----------------------------------------------------------------------------
527
528/// internal send message, always use this functions to send messages over links
529seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
530 logging_debug( "Sending data message on link " << link.toString() );
531
532 // get the mapping for this link
533 LinkDescriptor* ld = getDescriptor(link);
534 if( ld == NULL ) {
535 logging_error("Could not send message. "
536 << "Link not found id=" << link.toString());
537 return -1;
538 }
539
540 // check if the link is up yet, if its an auto link queue message
541 if( !ld->up ) {
542 ld->markAsUsed();
543 if( ld->autolink ) {
544 logging_info("Auto-link " << link.toString() << " not up, queue message");
545 Data data = data_serialize( message );
546 const_cast<Message*>(message)->dropPayload();
547 ld->messageQueue.push_back( new Message(data) );
548 } else {
549 logging_error("Link " << link.toString() << " not up, drop message");
550 }
551 return -1;
552 }
553
554 // compile overlay message (has service and node id)
555 OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId );
556 overmsg.encapsulate( const_cast<Message*>(message) );
557
558 // send message over relay/direct/overlay
559 return sendMessage( &overmsg, ld );
560}
561
562seqnum_t BaseOverlay::sendMessage(const Message* message,
563 const NodeID& node, const ServiceID& service) {
564
565 // find link for node and service
566 LinkDescriptor* ld = getAutoDescriptor( node, service );
567
568 // if we found no link, create an auto link
569 if( ld == NULL ) {
570
571 // debug output
572 logging_info( "No link to send message to node "
573 << node.toString() << " found for service "
574 << service.toString() << ". Creating auto link ..."
575 );
576
577 // this will call onlinkup on us, if everything worked we now have a mapping
578 LinkID link = LinkID::create();
579
580 // call base overlay to create a link
581 link = establishLink( node, service, link );
582 ld = getDescriptor( link );
583 if( ld == NULL ) {
584 logging_error( "Failed to establish auto-link.");
585 return -1;
586 }
587 ld->autolink = true;
588
589 logging_debug( "Auto-link establishment in progress to node "
590 << node.toString() << " with link id=" << link.toString() );
591 }
592 assert(ld != NULL);
593
594 // mark the link as used, as we now send a message through it
595 ld->markAsUsed();
596
597 // send / queue message
598 return sendMessage( message, ld->overlayId );
599}
600
601// ----------------------------------------------------------------------------
602
603const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
604 const LinkID& link) const {
605
606 // return own end-point descriptor
607 if( link == LinkID::UNSPECIFIED )
608 return bc->getEndpointDescriptor();
609
610 // find link descriptor. not found -> return unspecified
611 const LinkDescriptor* ld = getDescriptor(link);
612 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED;
613
614 // return endpoint-descriptor from base communication
615 return bc->getEndpointDescriptor( ld->communicationId );
616}
617
618const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
619 const NodeID& node) const {
620
621 // return own end-point descriptor
622 if( node == nodeId || node == NodeID::UNSPECIFIED )
623 return bc->getEndpointDescriptor();
624
625 // no joined and request remote descriptor? -> fail!
626 if( overlayInterface == NULL ) {
627 logging_error( "overlay interface not set, cannot resolve endpoint" );
628 return EndpointDescriptor::UNSPECIFIED;
629 }
630
631 // resolve end-point descriptor from the base-overlay routing table
632 return overlayInterface->resolveNode( node );
633}
634
635// ----------------------------------------------------------------------------
636
637bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
638 sideport = _sideport;
639 _sideport->configure( this );
640}
641
642bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
643 sideport = &SideportListener::DEFAULT;
644}
645
646// ----------------------------------------------------------------------------
647
648bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
649 logging_debug( "binding communication listener " << listener
650 << " on serviceid " << sid.toString() );
651
652 if( communicationListeners.contains( sid ) ) {
653 logging_error( "some listener already registered for service id "
654 << sid.toString() );
655 return false;
656 }
657
658 communicationListeners.registerItem( listener, sid );
659 return true;
660}
661
662
663bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
664 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
665
666 if( !communicationListeners.contains( sid ) ) {
667 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
668 return false;
669 }
670
671 if( communicationListeners.get(sid) != listener ) {
672 logging_warn( "listener bound to service id " << sid.toString()
673 << " is different than listener trying to unbind" );
674 return false;
675 }
676
677 communicationListeners.unregisterItem( sid );
678 return true;
679}
680
681// ----------------------------------------------------------------------------
682
683bool BaseOverlay::bind(NodeListener* listener) {
684 logging_debug( "Binding node listener " << listener );
685
686 // already bound? yes-> warning
687 NodeListenerVector::iterator i =
688 find( nodeListeners.begin(), nodeListeners.end(), listener );
689 if( i != nodeListeners.end() ) {
690 logging_warn("Node listener " << listener << " is already bound!" );
691 return false;
692 }
693
694 // no-> add
695 nodeListeners.push_back( listener );
696 return true;
697}
698
699bool BaseOverlay::unbind(NodeListener* listener) {
700 logging_debug( "Unbinding node listener " << listener );
701
702 // already unbound? yes-> warning
703 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
704 if( i == nodeListeners.end() ) {
705 logging_warn( "Node listener " << listener << " is not bound!" );
706 return false;
707 }
708
709 // no-> remove
710 nodeListeners.erase( i );
711 return true;
712}
713
714// ----------------------------------------------------------------------------
715
716void BaseOverlay::onLinkUp(const LinkID& id,
717 const address_v* local, const address_v* remote) {
718 logging_debug( "Link up with base communication link id=" << id );
719
720 // get descriptor for link
721 LinkDescriptor* ld = getDescriptor(id, true);
722
723 // handle bootstrap link we initiated
724 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
725 logging_info(
726 "Join has been initiated by me and the link is now up. " <<
727 "Sending out join request for SpoVNet " << spovnetId.toString()
728 );
729
730 // send join request message
731 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId );
732 JoinRequest joinRequest( spovnetId, nodeId );
733 overlayMsg.encapsulate( &joinRequest );
734 bc->sendMessage( id, &overlayMsg );
735 return;
736 }
737
738 // no link found? -> link establishment from remote, add one!
739 if (ld == NULL) {
740 ld = addDescriptor( id );
741 logging_debug( "onLinkUp (remote request) descriptor: " << ld );
742
743 // update descriptor
744 ld->fromRemote = true;
745 ld->communicationId = id;
746 ld->communicationUp = true;
747 ld->markAsUsed();
748
749 // in this case, do not inform listener, since service it unknown
750 // -> wait for update message!
751
752 // link mapping found? -> send update message with node-id and service id
753 } else {
754 logging_debug( "onLinkUp descriptor (initiated locally):" << ld );
755
756 // note: necessary to validate the link on the remote side!
757 logging_debug( "Sending out update" <<
758 " for service " << ld->service.toString() <<
759 " with local node id " << nodeId.toString() <<
760 " on link " << ld->overlayId.toString() );
761
762 // update descriptor
763 ld->markAsUsed();
764 ld->communicationUp = true;
765
766 // if link is a relayed link ->convert to direct link
767 if (ld->relay) {
768 logging_force( "Converting to direct link: " << ld );
769 ld->up = true;
770 ld->relay = false;
771 ld->localRelay = NodeID::UNSPECIFIED;
772 OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId );
773 overMsg.setRelayLink( ld->remoteLinkId );
774 bc->sendMessage( ld->communicationId, &overMsg );
775 }
776
777 // compile and send update message
778 OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
779 overlayMsg.setAutoLink( ld->autolink );
780 bc->sendMessage( ld->communicationId, &overlayMsg );
781 }
782}
783
784void BaseOverlay::onLinkDown(const LinkID& id,
785 const address_v* local, const address_v* remote) {
786
787 // erase bootstrap links
788 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
789 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
790
791 // get descriptor for link
792 LinkDescriptor* ld = getDescriptor(id, true);
793 if ( ld == NULL ) return; // not found? ->ignore!
794 logging_force( "onLinkDown descriptor: " << ld );
795
796 // inform listeners about link down
797 ld->communicationUp = false;
798 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
799 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
800
801 // delete all queued messages (auto links)
802 if( ld->messageQueue.size() > 0 ) {
803 logging_warn( "Dropping link " << id.toString() << " that has "
804 << ld->messageQueue.size() << " waiting messages" );
805 ld->flushQueue();
806 }
807
808 // erase mapping
809 eraseDescriptor(ld->overlayId);
810}
811
812void BaseOverlay::onLinkChanged(const LinkID& id,
813 const address_v* oldlocal, const address_v* newlocal,
814 const address_v* oldremote, const address_v* newremote) {
815
816 // get descriptor for link
817 LinkDescriptor* ld = getDescriptor(id, true);
818 if ( ld == NULL ) return; // not found? ->ignore!
819 logging_debug( "onLinkChanged descriptor: " << ld );
820
821 // inform listeners
822 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
823 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
824
825 // autolinks: refresh timestamp
826 ld->markAsUsed();
827}
828
829void BaseOverlay::onLinkFail(const LinkID& id,
830 const address_v* local, const address_v* remote) {
831 logging_debug( "Link fail with base communication link id=" << id );
832
833 // erase bootstrap links
834 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
835 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
836
837 // get descriptor for link
838 LinkDescriptor* ld = getDescriptor(id, true);
839 if ( ld == NULL ) return; // not found? ->ignore!
840 logging_debug( "Link failed id=" << ld->overlayId.toString() );
841
842 // inform listeners
843 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
844 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
845
846 // autolinks: refresh timestamp
847 ld->markAsUsed();
848}
849
850void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
851 const address_v* remote, const QoSParameterSet& qos) {
852 logging_debug( "Link quality changed with base communication link id=" << id );
853
854 // get descriptor for link
855 LinkDescriptor* ld = getDescriptor(id, true);
856 if ( ld == NULL ) return; // not found? ->ignore!
857 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
858
859 // autolinks: refresh timestamp
860 ld->markAsUsed();
861}
862
863bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
864 const address_v* remote ) {
865 logging_debug("Accepting link request from " << remote->to_string() );
866 return true;
867}
868
869/// handles a message from base communication
870bool BaseOverlay::receiveMessage(const Message* message,
871 const LinkID& link, const NodeID& ) {
872 // get descriptor for link
873 LinkDescriptor* ld = getDescriptor( link, true );
874
875 // link known?
876 if (ld == NULL) { // no-> handle with unspecified params
877 logging_debug("Received message from base communication, link descriptor unknown" );
878 return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED );
879 } else { // yes -> handle with overlay link id
880 logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() );
881 return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED );
882 }
883}
884
885// ----------------------------------------------------------------------------
886
887/// handles a message from an overlay
888void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) {
889 logging_debug("Received message from overlay -- "
890 << " link id=" << link.toString()
891 << " node id=" << source.toString() );
892 handleMessage( msg, link, LinkID::UNSPECIFIED, source );
893}
894
895// ----------------------------------------------------------------------------
896
897/// handles an incoming message
898bool BaseOverlay::handleMessage( const Message* message,
899 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) {
900 logging_debug( "Handling message: " << message->toString());
901
902 // decapsulate overlay message
903 OverlayMsg* overlayMsg =
904 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
905 if( overlayMsg == NULL ) return false;
906
907 // mark the link as in action
908 LinkDescriptor* ld = getDescriptor(boLink);
909 if (ld == NULL) ld = getDescriptor(bcLink, true);
910 if (ld != NULL) {
911 ld->markAsUsed();
912 ld->markAlive();
913 }
914
915 switch ( overlayMsg->getType() ) {
916 // ---------------------------------------------------------------------
917 // Handle spovnet instance join requests
918 // ---------------------------------------------------------------------
919 case OverlayMsg::typeJoinRequest: {
920
921 // decapsulate message
922 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
923 logging_info( "Received join request for spovnet " <<
924 joinReq->getSpoVNetID().toString() );
925
926 // check spovnet id
927 if( joinReq->getSpoVNetID() != spovnetId ) {
928 logging_error(
929 "Received join request for spovnet we don't handle " <<
930 joinReq->getSpoVNetID().toString() );
931 return false;
932 }
933
934 // TODO: here you can implement mechanisms to deny joining of a node
935 bool allow = true;
936 logging_info( "Sending join reply for spovnet " <<
937 spovnetId.toString() << " to node " <<
938 overlayMsg->getSourceNode().toString() <<
939 ". Result: " << (allow ? "allowed" : "denied") );
940 joiningNodes.push_back( overlayMsg->getSourceNode() );
941
942 // return overlay parameters
943 assert( overlayInterface != NULL );
944 logging_debug( "Using bootstrap end-point "
945 << getEndpointDescriptor().toString() )
946 OverlayParameterSet parameters = overlayInterface->getParameters();
947 OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId );
948 JoinReply replyMsg( spovnetId, parameters,
949 allow, getEndpointDescriptor() );
950 retmsg.encapsulate(&replyMsg);
951 bc->sendMessage( bcLink, &retmsg );
952 return true;
953 }
954
955 // ---------------------------------------------------------------------
956 // handle replies to spovnet instance join requests
957 // ---------------------------------------------------------------------
958 case OverlayMsg::typeJoinReply: {
959
960 // decapsulate message
961 logging_debug("received join reply message");
962 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
963
964 // correct spovnet?
965 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
966 logging_error( "Received SpoVNet join reply for " <<
967 replyMsg->getSpoVNetID().toString() <<
968 " != " << spovnetId.toString() );
969 return false;
970 }
971
972 // access granted? no -> fail
973 if( !replyMsg->getJoinAllowed() ) {
974 logging_error( "Our join request has been denied" );
975
976 // drop initiator link
977
978 if(bcLink != LinkID::UNSPECIFIED){
979 bc->dropLink( bcLink );
980
981 vector<LinkID>::iterator it = std::find(
982 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
983 if( it != bootstrapLinks.end() )
984 bootstrapLinks.erase(it);
985 }
986
987 // inform all registered services of the event
988 BOOST_FOREACH( NodeListener* i, nodeListeners )
989 i->onJoinFailed( spovnetId );
990
991 return true;
992 }
993
994 // access has been granted -> continue!
995 logging_info("Join request has been accepted for spovnet " <<
996 spovnetId.toString() );
997
998 logging_debug( "Using bootstrap end-point "
999 << replyMsg->getBootstrapEndpoint().toString() );
1000
1001 //
1002 // create overlay structure from spovnet parameter set
1003 // if we have not boostrapped yet against some other node
1004 //
1005
1006 if( overlayInterface == NULL ){
1007
1008 logging_debug("first-time bootstrapping");
1009
1010 overlayInterface = OverlayFactory::create(
1011 *this, replyMsg->getParam(), nodeId, this );
1012
1013 // overlay structure supported? no-> fail!
1014 if( overlayInterface == NULL ) {
1015 logging_error( "overlay structure not supported" );
1016
1017 if(bcLink != LinkID::UNSPECIFIED){
1018 bc->dropLink( bcLink );
1019
1020 vector<LinkID>::iterator it = std::find(
1021 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1022 if( it != bootstrapLinks.end() )
1023 bootstrapLinks.erase(it);
1024 }
1025
1026 // inform all registered services of the event
1027 BOOST_FOREACH( NodeListener* i, nodeListeners )
1028 i->onJoinFailed( spovnetId );
1029
1030 return true;
1031 }
1032
1033 // everything ok-> join the overlay!
1034 state = BaseOverlayStateCompleted;
1035 overlayInterface->createOverlay();
1036
1037 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1038
1039 // update ovlvis
1040 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1041
1042 // inform all registered services of the event
1043 BOOST_FOREACH( NodeListener* i, nodeListeners )
1044 i->onJoinCompleted( spovnetId );
1045
1046 } else {
1047
1048 // this is not the first bootstrap, just join the additional node
1049 logging_debug("not first-time bootstrapping");
1050 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1051
1052 } // if( overlayInterface == NULL )
1053
1054 return true;
1055 }
1056
1057 // ---------------------------------------------------------------------
1058 // handle data forward messages
1059 // ---------------------------------------------------------------------
1060 case OverlayMsg::typeData: {
1061
1062 // get service
1063 const ServiceID& service = overlayMsg->getService();
1064 logging_debug( "received data for service " << service.toString() );
1065
1066 // find listener
1067 CommunicationListener* listener =
1068 communicationListeners.get( service );
1069 if( listener == NULL ) return true;
1070
1071 // delegate data message
1072 listener->onMessage( overlayMsg,
1073 overlayMsg->getSourceNode(), ld->overlayId );
1074
1075 return true;
1076 }
1077
1078 // ---------------------------------------------------------------------
1079 // handle update messages for link establishment
1080 // ---------------------------------------------------------------------
1081 case OverlayMsg::typeUpdate: {
1082 logging_debug("Received type update message on link " << ld );
1083
1084 // get info
1085 const NodeID& sourcenode = overlayMsg->getSourceNode();
1086 const ServiceID& service = overlayMsg->getService();
1087
1088 // no link descriptor available -> error!
1089 if( ld == NULL ) {
1090 logging_warn( "received overlay update message for link " <<
1091 ld->overlayId.toString() << " for which we have no mapping" );
1092 return false;
1093 }
1094
1095 // update our link mapping information for this link
1096 bool changed =
1097 ( ld->remoteNode != sourcenode ) || ( ld->service != service );
1098 ld->remoteNode = sourcenode;
1099 ld->service = service;
1100 ld->autolink = overlayMsg->isAutoLink();
1101
1102 // if our link information changed, we send out an update, too
1103 if( changed ) {
1104 OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
1105 overMsg.setAutoLink(ld->autolink);
1106 bc->sendMessage( ld->communicationId, &overMsg );
1107 }
1108
1109 // service registered? no-> error!
1110 if( !communicationListeners.contains( service ) ) {
1111 logging_warn( "Link up: event listener has not been registered" );
1112 return false;
1113 }
1114
1115 // default or no service registered?
1116 CommunicationListener* listener = communicationListeners.get( service );
1117 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1118 logging_warn("Link up: event listener is default or null!" );
1119 return true;
1120 }
1121
1122 // update descriptor
1123 ld->listener = listener;
1124 ld->markAsUsed();
1125 ld->markAlive();
1126
1127 // ask the service whether it wants to accept this link
1128 if( !listener->onLinkRequest(sourcenode) ) {
1129
1130 logging_debug("Link id=" << ld->overlayId.toString() <<
1131 " has been denied by service " << service.toString() << ", dropping link");
1132
1133 // prevent onLinkDown calls to the service
1134 ld->listener = &CommunicationListener::DEFAULT;
1135
1136 // drop the link
1137 dropLink( ld->overlayId );
1138 return true;
1139 }
1140
1141 // set link up
1142 ld->up = true;
1143 logging_debug(
1144 "Link " << ld->overlayId.toString()
1145 << " has been accepted by service " << service.toString()
1146 << " and is now up"
1147 );
1148
1149 // auto links: link has been accepted -> send queued messages
1150 if( ld->messageQueue.size() > 0 ) {
1151 logging_info( "sending out queued messages on link " <<
1152 ld->overlayId.toString() );
1153 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1154 sendMessage( msg, ld->overlayId );
1155 delete msg;
1156 }
1157 ld->messageQueue.clear();
1158 }
1159
1160 // call the notification functions
1161 listener->onLinkUp( ld->overlayId, sourcenode );
1162 sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId );
1163
1164 return true;
1165 }
1166
1167 // ---------------------------------------------------------------------
1168 // handle link request forwarded through the overlay
1169 // ---------------------------------------------------------------------
1170 case OverlayMsg::typeLinkRequest: {
1171
1172 logging_debug( "received link request on link" );
1173
1174 // decapsulate message
1175 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
1176 const ServiceID& service = overlayMsg->getService();
1177
1178 // is request reply?
1179 if ( linkReq->isReply() ) {
1180
1181 // find link
1182 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
1183 if ( i == pendingLinks.end() ) {
1184 logging_error( "Nonce not found in link request" );
1185 return true;
1186 }
1187
1188 // debug message
1189 logging_debug( "Link request reply received. Establishing link "
1190 << i->second << " to " << (linkReq->getEndpoint()->toString())
1191 << " for service " << service.toString()
1192 << " with nonce " << linkReq->getNonce()
1193 << " using relay " << linkReq->getRelay().toString()
1194 << " and remote link id=" << linkReq->getRemoteLinkId()
1195 );
1196
1197 // get descriptor
1198 LinkDescriptor* ldn = getDescriptor(i->second);
1199
1200 // check if link request reply has a relay node ...
1201 if (!linkReq->getRelay().isUnspecified()) { // yes->
1202 ldn->up = true;
1203 ldn->relay = true;
1204 if (ldn->localRelay.isUnspecified()) {
1205 logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn );
1206 showLinkState();
1207 }
1208 ldn->remoteRelay = linkReq->getRelay();
1209 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1210 ldn->remoteNode = overlayMsg->getSourceNode();
1211
1212 ldn->markAlive();
1213
1214 // compile and send update message
1215 OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId );
1216 _overlayMsg.setAutoLink(ldn->autolink);
1217 sendMessage( &_overlayMsg, ldn );
1218
1219 // auto links: link has been accepted -> send queued messages
1220 if( ldn->messageQueue.size() > 0 ) {
1221 logging_info( "Sending out queued messages on link " <<
1222 ldn->overlayId.toString() );
1223 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1224 sendMessage( msg, ldn->overlayId );
1225 delete msg;
1226 }
1227 ldn->messageQueue.clear();
1228 }
1229
1230 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1231
1232 // try to establish a direct link
1233 ldn->communicationId =
1234 bc->establishLink( *linkReq->getEndpoint(), i->second );
1235 }
1236
1237 // no relay node-> use overlay routing
1238 else {
1239 ldn->up = true;
1240
1241 // establish direct link
1242 ldn->communicationId =
1243 bc->establishLink( *linkReq->getEndpoint(), i->second );
1244 }
1245 } else {
1246 logging_debug( "Link request received from node id="
1247 << overlayMsg->getSourceNode() );
1248
1249 // create link descriptor
1250 LinkDescriptor* ldn =
1251 createLinkDescriptor(overlayMsg->getSourceNode(),
1252 overlayMsg->getService(), LinkID::UNSPECIFIED );
1253 assert(!ldn->overlayId.isUnspecified());
1254
1255 // create reply message
1256 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
1257 LinkRequest link_request_msg(
1258 linkReq->getNonce(),
1259 &bc->getEndpointDescriptor(),
1260 true, ldn->overlayId, ldn->localRelay
1261 );
1262 overlay_msg.encapsulate( &link_request_msg );
1263
1264 // debug message
1265 logging_debug( "Sending LinkRequest reply for link with nonce " <<
1266 linkReq->getNonce() );
1267
1268 // if this is a relay link-> update information & inform listeners
1269 if (!linkReq->getRelay().isUnspecified()) {
1270 // set flags
1271 ldn->up = true;
1272 ldn->relay = true;
1273 if (ldn->localRelay.isUnspecified()) {
1274 logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn );
1275 showLinkState();
1276 }
1277 ldn->remoteRelay = linkReq->getRelay();
1278 ldn->remoteNode = overlayMsg->getSourceNode();
1279 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1280 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1281 }
1282
1283 // route message back over overlay
1284 sendMessage( &overlay_msg, ldn );
1285 }
1286 return true;
1287 }
1288
1289 // ---------------------------------------------------------------------
1290 // handle relay message to forward messages
1291 // ---------------------------------------------------------------------
1292 case OverlayMsg::typeRelay: {
1293
1294 logging_debug( "received relay request on link" );
1295
1296 // decapsulate message
1297 RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>();
1298
1299 // is relay message informative?
1300 switch (relayMsg->getType()) {
1301
1302 // handle relay notification
1303 case RelayMessage::typeInform: {
1304 logging_info("Received relay information message with"
1305 << " relay " << relayMsg->getRelayNode()
1306 << " destination " << relayMsg->getDestNode() );
1307
1308 // mark incoming link as relay
1309 if (ld!=NULL) ld->markAsRelay();
1310
1311 // am I the destination of this message? yes->
1312 if (relayMsg->getDestNode() == nodeId ) {
1313 // deliver relay message locally!
1314 logging_debug("Relay message reached destination. Handling the message.");
1315 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1316 return true;
1317 }
1318
1319 // create route message
1320 OverlayMsg _overMsg( *overlayMsg );
1321 RelayMessage _relayMsg( *relayMsg );
1322 _relayMsg.setType( RelayMessage::typeRoute );
1323 _overMsg.encapsulate( &_relayMsg );
1324
1325 // forward message
1326 if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) {
1327 logging_info("Routing relay message to " << relayMsg->getDestNode().toString() );
1328 overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg );
1329 } else {
1330 logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() );
1331 overlayInterface->routeMessage(relayMsg->getRelayNode(), &_overMsg );
1332 }
1333 return true;
1334 }
1335
1336 // handle relay routing
1337 case RelayMessage::typeRoute: {
1338 logging_info("Received relay route message with"
1339 << " relay " << relayMsg->getRelayNode()
1340 << " destination " << relayMsg->getDestNode() );
1341
1342 // mark incoming link as relay
1343 if (ld!=NULL) ld->markAsRelay();
1344
1345 // am I the destination of this message? yes->
1346 if (relayMsg->getDestNode() == nodeId ) {
1347 // deliver relay message locally!
1348 logging_debug("Relay message reached destination. Handling the message.");
1349 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1350 return true;
1351 }
1352
1353 // am I the relay for this message? yes->
1354 if (relayMsg->getRelayNode() == nodeId ) {
1355 logging_debug("I'm the relay for this message. Sending to destination.");
1356 OverlayMsg _overMsg( *overlayMsg );
1357 RelayMessage _relayMsg( *relayMsg );
1358 _overMsg.encapsulate(&_relayMsg);
1359
1360 /// this must be handled by using relay link!
1361 overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg );
1362 return true;
1363 }
1364
1365 // error: I'm not a relay or destination!
1366 logging_error("This node is neither relay nor destination. Dropping Message!");
1367 return true;
1368 }
1369 default: {
1370 logging_error("RelayMessage Unknown!");
1371 return true;
1372 }
1373 }
1374
1375 break;
1376 }
1377
1378 // ---------------------------------------------------------------------
1379 // handle keep-alive messages
1380 // ---------------------------------------------------------------------
1381 case OverlayMsg::typeKeepAlive: {
1382
1383 logging_debug( "received keep-alive on link" );
1384
1385 if ( ld != NULL ) {
1386 //logging_force("Keep-Alive for "<< ld->overlayId);
1387 ld->markAlive();
1388 }
1389 break;
1390 }
1391
1392 // ---------------------------------------------------------------------
1393 // handle direct link replacement messages
1394 // ---------------------------------------------------------------------
1395 case OverlayMsg::typeDirectLink: {
1396
1397 logging_debug( "received direct link replacement request" );
1398
1399 LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() );
1400 logging_force( "Received direct link convert notification for " << rld );
1401 rld->communicationId = ld->communicationId;
1402 rld->communicationUp = true;
1403 rld->relay = false;
1404 rld->localRelay = NodeID::UNSPECIFIED;
1405 rld->remoteRelay = NodeID::UNSPECIFIED;
1406 eraseDescriptor(ld->overlayId);
1407 break;
1408 }
1409
1410 // ---------------------------------------------------------------------
1411 // handle unknown message type
1412 // ---------------------------------------------------------------------
1413 default: {
1414 logging_error( "received message in invalid state! don't know " <<
1415 "what to do with this message of type " <<
1416 overlayMsg->getType() );
1417 return false;
1418 }
1419
1420 } /* switch */
1421
1422 return false;
1423}
1424
1425// ----------------------------------------------------------------------------
1426
1427void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1428
1429 logging_debug( "broadcasting message to all known nodes " <<
1430 "in the overlay from service " + service.toString() );
1431
1432 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes();
1433 OverlayInterface::NodeList::iterator i = nodes.begin();
1434 for(; i != nodes.end(); i++ ) {
1435 if( *i == nodeId) continue; // don't send to ourselfs
1436 sendMessage( message, *i, service );
1437 }
1438}
1439
1440vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1441 // the known nodes _can_ also include our node, so we remove ourself
1442 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1443 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1444 if( i != nodes.end() ) nodes.erase( i );
1445 return nodes;
1446}
1447
1448const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1449 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1450 const LinkDescriptor* ld = getDescriptor(lid);
1451 if( ld == NULL ) return NodeID::UNSPECIFIED;
1452 else return ld->remoteNode;
1453}
1454
1455vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1456 vector<LinkID> linkvector;
1457 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1458 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1459 linkvector.push_back( ld->overlayId );
1460 }
1461 }
1462 return linkvector;
1463}
1464
1465
1466void BaseOverlay::onNodeJoin(const NodeID& node) {
1467 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1468 if( i == joiningNodes.end() ) return;
1469
1470 logging_info( "node has successfully joined baseoverlay and overlay structure "
1471 << node.toString() );
1472
1473 joiningNodes.erase( i );
1474}
1475
1476void BaseOverlay::eventFunction() {
1477
1478 // send keep-alive messages over established links
1479 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1480 if (!ld->up) continue;
1481 OverlayMsg overMsg( OverlayMsg::typeKeepAlive,
1482 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1483 sendMessage( &overMsg, ld );
1484 }
1485
1486 // iterate over all links and check for time boundaries
1487 vector<LinkDescriptor*> oldlinks;
1488 time_t now = time(NULL);
1489 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1490 // remote used as relay flag
1491 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)
1492 ld->usedAsRelay = false;
1493
1494 // keep alives missed? yes->
1495 if ( !ld->up && difftime( now, ld->keepAliveTime ) > 2 ) {
1496
1497 // increase counter
1498 ld->keepAliveMissed++;
1499
1500 // missed more than four keep-alive messages (4 sec)? -> drop link
1501 if (ld->keepAliveMissed > 10) {
1502 logging_force( "Link connection request is stale, closing: " << ld );
1503 oldlinks.push_back( ld );
1504 }
1505 }
1506
1507 if (!ld->up) continue;
1508
1509 // drop links that are dropped and not used as relay
1510 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink)
1511 oldlinks.push_back( ld );
1512 else
1513
1514 // auto-link time exceeded?
1515 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 )
1516 oldlinks.push_back( ld );
1517
1518 else
1519
1520 // keep alives missed? yes->
1521 if ( !ld->autolink && difftime( now, ld->keepAliveTime ) > 2 ) {
1522
1523 // increase counter
1524 ld->keepAliveMissed++;
1525
1526 // missed more than four keep-alive messages (4 sec)? -> drop link
1527 if (ld->keepAliveMissed >= 8) {
1528 logging_force( "Link is stale, closing: " << ld );
1529 oldlinks.push_back( ld );
1530 }
1531 }
1532 }
1533
1534 // show link state
1535 counter++;
1536 if (counter>=4) showLinkState();
1537 if (counter>=4 || counter<0) counter = 0;
1538
1539 // drop links
1540 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
1541
1542 vector<LinkID>::iterator it = std::find(
1543 bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId);
1544
1545 if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){
1546 logging_force( "Not dropping initiator link: " << ld );
1547 continue;
1548 }
1549 logging_force( "Link timed out. Dropping " << ld );
1550 dropLink( ld->overlayId );
1551 }
1552}
1553
1554void BaseOverlay::showLinkState() {
1555 int i=0;
1556 logging_force("--- link state -------------------------------");
1557 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1558 logging_force("link " << i << ": " << ld);
1559 i++;
1560 }
1561 logging_force("----------------------------------------------");
1562}
1563
1564}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.