close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/overlay/BaseOverlay.cpp@ 5285

Last change on this file since 5285 was 5284, checked in by mies, 15 years ago

+ added new transport modules and adapted ariba to them
+ exchange endpoint descriptors an link establishment
+ clean up of base communication
+ link establishment with in the presence of multiple endpoints
+ local discovery for ipv6, ipv4 and bluetooth mac addresses

File size: 47.9 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51#include "ariba/overlay/messages/OverlayMsg.h"
52#include "ariba/overlay/messages/JoinRequest.h"
53#include "ariba/overlay/messages/JoinReply.h"
54#include "ariba/overlay/messages/LinkRequest.h"
55#include "ariba/overlay/messages/RelayMessage.h"
56
57#include "ariba/utility/misc/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62#define logging_force(x) std::cout << x << std::endl;
63#define logging_force1(x) std::cout << x << std::endl;
64
65LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
66 BOOST_FOREACH( LinkDescriptor* lp, links )
67 if ((communication ? lp->communicationId : lp->overlayId) == link)
68 return lp;
69 return NULL;
70}
71
72const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
73 BOOST_FOREACH( const LinkDescriptor* lp, links )
74 if ((communication ? lp->communicationId : lp->overlayId) == link)
75 return lp;
76 return NULL;
77}
78
79LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
80 BOOST_FOREACH( LinkDescriptor* lp, links )
81 if (lp->autolink && lp->remoteNode == node && lp->service == service)
82 return lp;
83 return NULL;
84}
85
86void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
87 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
88 LinkDescriptor* ld = *i;
89 if ((communication ? ld->communicationId : ld->overlayId) == link) {
90 delete ld;
91 links.erase(i);
92 break;
93 }
94 }
95}
96
97LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
98 LinkDescriptor* desc = getDescriptor( link );
99 if ( desc == NULL ) {
100 desc = new LinkDescriptor();
101 desc->overlayId = link;
102 links.push_back(desc);
103 }
104 return desc;
105}
106
107/// returns a direct link relay descriptor to the given relay node
108LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& relayNode ) {
109 BOOST_FOREACH( LinkDescriptor* lp, links )
110 if (lp->remoteNode == relayNode &&
111 lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&
112 lp->relay == false &&
113 lp->up)
114 return lp;
115 return NULL;
116}
117
118/// find a proper relay node
119const NodeID BaseOverlay::findRelayNode( const NodeID& id ) {
120 LinkDescriptor* rld = NULL;
121 NodeID relayNode = NodeID::UNSPECIFIED;
122
123 // get used next hop towards node
124 LinkID rlid = overlayInterface->getNextLinkId(id);
125 while ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {
126
127 // get descriptor of first hop
128 rld = getDescriptor(rlid);
129 logging_force1( rld );
130
131 // is first hop a relay path? yes-> try to find real link!
132 if ( rld->relay )
133 relayNode = getRelayDescriptor(rld->localRelay)->remoteNode;
134
135 // no-> a proper relay node has been found
136 else relayNode = rld->remoteNode;
137 }
138 logging_force1( "Potential relay node " << relayNode.toString() );
139 // do not return myself or use the node as relay node
140 if (relayNode == nodeId)
141 return NodeID::UNSPECIFIED;
142 else {
143 logging_force1( "Returning relay node " << relayNode.toString() );
144 return relayNode;
145 }
146}
147
148/// forwards a message over relays/overlay/directly using link descriptor
149seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) {
150
151 // directly send message
152 if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) {
153 logging_debug("sendMessage: Sending message via Base Communication");
154 return bc->sendMessage( ld->communicationId, message );
155 }
156
157 // relay message
158 else if ( ld->relay ) {
159 logging_debug("sendMessage: Relaying message to node "
160 << ld->remoteNode.toString()
161 << " using relay " << ld->localRelay
162 );
163
164 // get local relay link descriptor and mark as used for relaying
165 LinkDescriptor* rld = getRelayDescriptor(ld->localRelay);
166 if (rld==NULL) {
167 logging_error("sendMessage: Relay descriptor for relay " <<
168 ld->localRelay.toString() << " unknown.");
169 return -1;
170 }
171 rld->markAsRelay();
172
173 // create a information relay message to inform the relay about
174 OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId );
175 RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId );
176 relayMsg.encapsulate( message );
177 overlay_msg.encapsulate( &relayMsg );
178
179 // route message to relay node in order to inform it!
180 logging_debug("sendMessage: Sending message over relayed link with" << ld );
181 overlayInterface->routeMessage(rld->remoteNode, rld->overlayId, &overlay_msg);
182 return 0;
183 }
184
185 // route message using overlay
186 else {
187 logging_error("Could not send message descriptor=" << ld );
188 logging_debug( "sendMessage: Routing message to node " << ld->remoteNode.toString() );
189 overlayInterface->routeMessage( ld->remoteNode, message );
190 return 0;
191 }
192
193 return -1;
194}
195
196/// creates a link descriptor, apply relay semantics if possible
197LinkDescriptor* BaseOverlay::createLinkDescriptor(
198 const NodeID& remoteNode, const ServiceID& service, const LinkID& link_id ) {
199
200 // find listener
201 if( !communicationListeners.contains( service ) ) {
202 logging_error( "No listener found for service " << service.toString() );
203 return NULL;
204 }
205 CommunicationListener* listener = communicationListeners.get( service );
206 assert( listener != NULL );
207
208 // copy link id
209 LinkID linkid = link_id;
210
211 // create link id if necessary
212 if ( linkid.isUnspecified() )
213 linkid = LinkID::create();
214
215 // create relay link descriptor
216 NodeID relayNode = findRelayNode(remoteNode);
217
218 // add descriptor
219 LinkDescriptor* ld = addDescriptor( linkid );
220 ld->overlayId = linkid;
221 ld->service = service;
222 ld->listener = listener;
223 ld->remoteNode = remoteNode;
224
225 // set relay node if available
226 ld->relay = !relayNode.isUnspecified();
227 ld->localRelay = relayNode;
228
229 if (!ld->relay)
230 logging_error("No relay found!");
231
232 // debug output
233 logging_debug( "Created link descriptor: " << ld );
234
235 return ld;
236}
237
238
239// ----------------------------------------------------------------------------
240
241use_logging_cpp(BaseOverlay);
242
243// ----------------------------------------------------------------------------
244
245BaseOverlay::BaseOverlay() :
246 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
247 spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED),
248 state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT) {
249}
250
251BaseOverlay::~BaseOverlay() {
252}
253
254// ----------------------------------------------------------------------------
255
256void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
257 logging_info("Starting...");
258
259 // set parameters
260 bc = &_basecomm;
261 nodeId = _nodeid;
262
263 // register at base communication
264 bc->registerMessageReceiver( this );
265 bc->registerEventListener( this );
266
267 // timer for auto link management
268 Timer::setInterval( 500 );
269 Timer::start();
270}
271
272void BaseOverlay::stop() {
273 logging_info("Stopping...");
274
275 // stop timer
276 Timer::stop();
277
278 // delete oberlay interface
279 if(overlayInterface != NULL) {
280 delete overlayInterface;
281 overlayInterface = NULL;
282 }
283
284 // unregister at base communication
285 bc->unregisterMessageReceiver( this );
286 bc->unregisterEventListener( this );
287}
288
289// ----------------------------------------------------------------------------
290
291void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
292 const EndpointDescriptor& bootstrapEp) {
293
294 ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
295 logging_info( "Starting to join spovnet " << id.toString() <<
296 " with nodeid " << nodeId.toString());
297
298 // contact the spovnet initiator and request to join. if the join is granted we will
299 // receive further information on the structure of the overlay that is used in the spovnet
300 // but first, we have to establish a link to the initiator...
301 spovnetId = id;
302 state = BaseOverlayStateJoinInitiated;
303
304 initiatorLink = bc->establishLink( bootstrapEp );
305 logging_info("join process initiated for " << id.toString() << "...");
306}
307
308void BaseOverlay::leaveSpoVNet() {
309
310 logging_info( "Leaving spovnet " << spovnetId );
311 bool ret = ( state != this->BaseOverlayStateInvalid );
312
313 logging_debug( "Dropping all auto-links" );
314
315 // gather all service links
316 vector<LinkID> servicelinks;
317 BOOST_FOREACH( LinkDescriptor* ld, links ) {
318 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
319 servicelinks.push_back( ld->overlayId );
320 }
321
322 // drop all service links
323 BOOST_FOREACH( LinkID lnk, servicelinks )
324 dropLink( lnk );
325
326 // let the node leave the spovnet overlay interface
327 logging_debug( "Leaving overlay" );
328 if( overlayInterface != NULL )
329 overlayInterface->leaveOverlay();
330
331 // leave spovnet
332 if( state != BaseOverlayStateInitiator ) {
333 // then, leave the spovnet baseoverlay
334 OverlayMsg overMsg( OverlayMsg::typeBye, nodeId );
335 bc->sendMessage( initiatorLink, &overMsg );
336
337 // drop the link and set to correct state
338 bc->dropLink( initiatorLink );
339 initiatorLink = LinkID::UNSPECIFIED;
340 }
341
342 // change to inalid state
343 state = BaseOverlayStateInvalid;
344 ovl.visShutdown( ovlId, nodeId, string("") );
345
346 // inform all registered services of the event
347 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
348 if( ret ) i->onLeaveCompleted( spovnetId );
349 else i->onLeaveFailed( spovnetId );
350 }
351}
352
353void BaseOverlay::createSpoVNet(const SpoVNetID& id,
354 const OverlayParameterSet& param,
355 const SecurityParameterSet& sec,
356 const QoSParameterSet& qos) {
357
358 // set the state that we are an initiator, this way incoming messages are
359 // handled correctly
360 logging_info( "creating spovnet " + id.toString() <<
361 " with nodeid " << nodeId.toString() );
362
363 spovnetId = id;
364 state = BaseOverlayStateInitiator;
365
366 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
367 if( overlayInterface == NULL ) {
368 logging_fatal( "overlay structure not supported" );
369 state = BaseOverlayStateInvalid;
370 return;
371 }
372
373 // bootstrap against ourselfs
374 overlayInterface->joinOverlay();
375 BOOST_FOREACH( NodeListener* i, nodeListeners )
376 i->onJoinCompleted( spovnetId );
377
378 ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
379 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
380}
381
382// ----------------------------------------------------------------------------
383
384const LinkID BaseOverlay::establishLink(
385 const EndpointDescriptor& ep, const NodeID& nodeid,
386 const ServiceID& service, const LinkID& linkid ) {
387
388 LinkID link_id = linkid;
389
390 // establish link via overlay
391 if (!nodeid.isUnspecified())
392 link_id = establishLink( nodeid, service, link_id );
393
394 // establish link directly if only ep is known
395 if (nodeid.isUnspecified())
396 establishLink( ep, service, link_id );
397
398 return link_id;
399}
400
401/// call base communication's establish link and add link mapping
402const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep,
403 const ServiceID& service, const LinkID& linkid ) {
404
405 // create a new link id if necessary
406 LinkID link_id = linkid;
407 if (link_id.isUnspecified()) link_id = LinkID::create();
408
409 /// find a service listener
410 if( !communicationListeners.contains( service ) ) {
411 logging_error( "No listener registered for service id=" << service.toString() );
412 return LinkID::UNSPECIFIED;
413 }
414 CommunicationListener* listener = communicationListeners.get( service );
415 assert( listener != NULL );
416
417 /// establish link and add mapping
418 logging_info("Establishing direct link " << link_id.toString()
419 << " using " << ep.toString());
420
421 // create descriptor
422 LinkDescriptor* ld = addDescriptor( link_id );
423 ld->overlayId = link_id;
424 ld->communicationId = link_id;
425 ld->listener = listener;
426 ld->service = service;
427 bc->establishLink( ep, link_id );
428
429 return link_id;
430}
431
432/// establishes a link between two arbitrary nodes
433const LinkID BaseOverlay::establishLink( const NodeID& node,
434 const ServiceID& service, const LinkID& link_id ) {
435
436 // do not establish a link to myself!
437 if (node == nodeId) return LinkID::UNSPECIFIED;
438
439 // create a link descriptor
440 LinkDescriptor* ld = createLinkDescriptor( node, service, link_id );
441
442 // create link request message with own link id
443 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
444 LinkRequest link_request_msg(
445 nonce, &bc->getEndpointDescriptor(), false,
446 ld->overlayId, ld->localRelay );
447 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
448 overlay_msg.encapsulate( &link_request_msg );
449 pendingLinks.insert( make_pair(nonce, ld->overlayId) );
450
451 // debug message
452 logging_debug(
453 "Sending link request with"
454 << " link id=" << ld->overlayId
455 << " node id=" << ld->remoteNode.toString()
456 << " service id=" << ld->service.toString()
457 << " local relay id=" << ld->localRelay.toString()
458 << " nonce= " << nonce
459 );
460
461 // sending message through new link
462 sendMessage( &overlay_msg, ld );
463
464 return ld->overlayId;
465}
466
467/// drops an established link
468void BaseOverlay::dropLink(const LinkID& link) {
469 logging_debug( "Dropping link (initiated locally):" << link.toString() );
470
471 // find the link item to drop
472 LinkDescriptor* ld = getDescriptor(link);
473 if( ld == NULL ) {
474 logging_warn( "Can't drop link, link is unknown!");
475 return;
476 }
477
478 // delete all queued messages
479 if( ld->messageQueue.size() > 0 ) {
480 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
481 << ld->messageQueue.size() << " waiting messages" );
482 ld->flushQueue();
483 }
484
485 // inform sideport and listener
486 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
487 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
488
489 // do not drop relay links
490 if (!ld->usedAsRelay) {
491 // drop the link in base communication
492 if (ld->communicationUp) bc->dropLink( ld->communicationId );
493
494 // erase descriptor
495 eraseDescriptor( ld->overlayId );
496 } else
497 ld->dropWhenRelaysLeft = true;
498}
499
500// ----------------------------------------------------------------------------
501
502/// internal send message, always use this functions to send messages over links
503seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
504 logging_debug( "Sending data message on link " << link.toString() );
505
506 // get the mapping for this link
507 LinkDescriptor* ld = getDescriptor(link);
508 if( ld == NULL ) {
509 logging_error("Could not send message. "
510 << "Link not found id=" << link.toString());
511 return -1;
512 }
513
514 // check if the link is up yet, if its an auto link queue message
515 if( !ld->up ) {
516 ld->markAsUsed();
517 if( ld->autolink ) {
518 logging_info("Auto-link " << link.toString() << " not up, queue message");
519 Data data = data_serialize( message );
520 const_cast<Message*>(message)->dropPayload();
521 ld->messageQueue.push_back( new Message(data) );
522 } else {
523 logging_error("Link " << link.toString() << " not up, drop message");
524 }
525 return -1;
526 }
527
528 // compile overlay message (has service and node id)
529 OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId );
530 overmsg.encapsulate( const_cast<Message*>(message) );
531
532 // send message over relay/direct/overlay
533 return sendMessage( &overmsg, ld );
534}
535
536seqnum_t BaseOverlay::sendMessage(const Message* message,
537 const NodeID& node, const ServiceID& service) {
538
539 // find link for node and service
540 LinkDescriptor* ld = getAutoDescriptor( node, service );
541
542 // if we found no link, create an auto link
543 if( ld == NULL ) {
544
545 // debug output
546 logging_info( "No link to send message to node "
547 << node.toString() << " found for service "
548 << service.toString() << ". Creating auto link ..."
549 );
550
551 // this will call onlinkup on us, if everything worked we now have a mapping
552 LinkID link = LinkID::create();
553
554 // call base overlay to create a link
555 link = establishLink( node, service, link );
556 ld = getDescriptor( link );
557 if( ld == NULL ) {
558 logging_error( "Failed to establish auto-link.");
559 return -1;
560 }
561 ld->autolink = true;
562
563 logging_debug( "Auto-link establishment in progress to node "
564 << node.toString() << " with link id=" << link.toString() );
565 }
566 assert(ld != NULL);
567
568 // mark the link as used, as we now send a message through it
569 ld->markAsUsed();
570
571 // send / queue message
572 return sendMessage( message, ld->overlayId );
573}
574
575// ----------------------------------------------------------------------------
576
577const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
578 const LinkID& link) const {
579
580 // return own end-point descriptor
581 if( link == LinkID::UNSPECIFIED )
582 return bc->getEndpointDescriptor();
583
584 // find link descriptor. not found -> return unspecified
585 const LinkDescriptor* ld = getDescriptor(link);
586 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED;
587
588 // return endpoint-descriptor from base communication
589 return bc->getEndpointDescriptor( ld->communicationId );
590}
591
592const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
593 const NodeID& node) const {
594
595 // return own end-point descriptor
596 if( node == nodeId || node == NodeID::UNSPECIFIED )
597 return bc->getEndpointDescriptor();
598
599 // no joined and request remote descriptor? -> fail!
600 if( overlayInterface == NULL ) {
601 logging_error( "overlay interface not set, cannot resolve endpoint" );
602 return EndpointDescriptor::UNSPECIFIED;
603 }
604
605 // resolve end-point descriptor from the base-overlay routing table
606 return overlayInterface->resolveNode( node );
607}
608
609// ----------------------------------------------------------------------------
610
611bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
612 sideport = _sideport;
613 _sideport->configure( this );
614}
615
616bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
617 sideport = &SideportListener::DEFAULT;
618}
619
620// ----------------------------------------------------------------------------
621
622bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
623 logging_debug( "binding communication listener " << listener
624 << " on serviceid " << sid.toString() );
625
626 if( communicationListeners.contains( sid ) ) {
627 logging_error( "some listener already registered for service id "
628 << sid.toString() );
629 return false;
630 }
631
632 communicationListeners.registerItem( listener, sid );
633 return true;
634}
635
636
637bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
638 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
639
640 if( !communicationListeners.contains( sid ) ) {
641 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
642 return false;
643 }
644
645 if( communicationListeners.get(sid) != listener ) {
646 logging_warn( "listener bound to service id " << sid.toString()
647 << " is different than listener trying to unbind" );
648 return false;
649 }
650
651 communicationListeners.unregisterItem( sid );
652 return true;
653}
654
655// ----------------------------------------------------------------------------
656
657bool BaseOverlay::bind(NodeListener* listener) {
658 logging_debug( "Binding node listener " << listener );
659
660 // already bound? yes-> warning
661 NodeListenerVector::iterator i =
662 find( nodeListeners.begin(), nodeListeners.end(), listener );
663 if( i != nodeListeners.end() ) {
664 logging_warn("Node listener " << listener << " is already bound!" );
665 return false;
666 }
667
668 // no-> add
669 nodeListeners.push_back( listener );
670 return true;
671}
672
673bool BaseOverlay::unbind(NodeListener* listener) {
674 logging_debug( "Unbinding node listener " << listener );
675
676 // already unbound? yes-> warning
677 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
678 if( i == nodeListeners.end() ) {
679 logging_warn( "Node listener " << listener << " is not bound!" );
680 return false;
681 }
682
683 // no-> remove
684 nodeListeners.erase( i );
685 return true;
686}
687
688// ----------------------------------------------------------------------------
689
690void BaseOverlay::onLinkUp(const LinkID& id,
691 const address_v* local, const address_v* remote) {
692 logging_debug( "Link up with base communication link id=" << id );
693
694 // get descriptor for link
695 LinkDescriptor* ld = getDescriptor(id, true);
696
697 // handle initiator link
698 if(state == BaseOverlayStateJoinInitiated && id == initiatorLink) {
699 logging_info(
700 "Join has been initiated by me and the link is now up. " <<
701 "Sending out join request for SpoVNet " << spovnetId.toString()
702 );
703
704 // send join request message
705 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId );
706 JoinRequest joinRequest( spovnetId, nodeId );
707 overlayMsg.encapsulate( &joinRequest );
708 bc->sendMessage( id, &overlayMsg );
709 return;
710 }
711
712 // no link found? -> link establishment from remote, add one!
713 if (ld == NULL) {
714 ld = addDescriptor( id );
715 logging_debug( "onLinkUp (remote request) descriptor: " << ld );
716
717 // update descriptor
718 ld->fromRemote = true;
719 ld->communicationId = id;
720 ld->communicationUp = true;
721 ld->markAsUsed();
722
723 // in this case, do not inform listener, since service it unknown
724 // -> wait for update message!
725
726 // link mapping found? -> send update message with node-id and service id
727 } else {
728 logging_debug( "onLinkUp descriptor (initiated locally):" << ld );
729
730 // note: necessary to validate the link on the remote side!
731 logging_debug( "Sending out update" <<
732 " for service " << ld->service.toString() <<
733 " with local node id " << nodeId.toString() <<
734 " on link " << ld->overlayId.toString() );
735
736 // update descriptor
737 ld->markAsUsed();
738 ld->communicationUp = true;
739
740 // if link is a relayed link ->convert to direct link
741 if (ld->relay) {
742 logging_force( "Converting to direct link: " << ld );
743 ld->up = true;
744 ld->relay = false;
745 ld->localRelay = NodeID::UNSPECIFIED;
746 OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId );
747 overMsg.setRelayLink( ld->remoteLinkId );
748 bc->sendMessage( ld->communicationId, &overMsg );
749 }
750
751 // compile and send update message
752 OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
753 overlayMsg.setAutoLink( ld->autolink );
754 bc->sendMessage( ld->communicationId, &overlayMsg );
755 }
756}
757
758void BaseOverlay::onLinkDown(const LinkID& id,
759 const address_v* local, const address_v* remote) {
760
761 // get descriptor for link
762 LinkDescriptor* ld = getDescriptor(id, true);
763 if ( ld == NULL ) return; // not found? ->ignore!
764 logging_force( "onLinkDown descriptor: " << ld );
765
766 // inform listeners about link down
767 ld->communicationUp = false;
768 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
769 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
770
771 // delete all queued messages (auto links)
772 if( ld->messageQueue.size() > 0 ) {
773 logging_warn( "Dropping link " << id.toString() << " that has "
774 << ld->messageQueue.size() << " waiting messages" );
775 ld->flushQueue();
776 }
777
778 // erase mapping
779 eraseDescriptor(ld->overlayId);
780}
781
782void BaseOverlay::onLinkChanged(const LinkID& id,
783 const address_v* oldlocal, const address_v* newlocal,
784 const address_v* oldremote, const address_v* newremote) {
785
786 // get descriptor for link
787 LinkDescriptor* ld = getDescriptor(id, true);
788 if ( ld == NULL ) return; // not found? ->ignore!
789 logging_debug( "onLinkChanged descriptor: " << ld );
790
791 // inform listeners
792 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
793 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
794
795 // autolinks: refresh timestamp
796 ld->markAsUsed();
797}
798
799void BaseOverlay::onLinkFail(const LinkID& id,
800 const address_v* local, const address_v* remote) {
801 logging_debug( "Link fail with base communication link id=" << id );
802
803 // get descriptor for link
804 LinkDescriptor* ld = getDescriptor(id, true);
805 if ( ld == NULL ) return; // not found? ->ignore!
806 logging_debug( "Link failed id=" << ld->overlayId.toString() );
807
808 // inform listeners
809 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
810 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
811
812 // autolinks: refresh timestamp
813 ld->markAsUsed();
814}
815
816void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
817 const address_v* remote, const QoSParameterSet& qos) {
818 logging_debug( "Link quality changed with base communication link id=" << id );
819
820 // get descriptor for link
821 LinkDescriptor* ld = getDescriptor(id, true);
822 if ( ld == NULL ) return; // not found? ->ignore!
823 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
824
825 // autolinks: refresh timestamp
826 ld->markAsUsed();
827}
828
829bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
830 const address_v* remote ) {
831 logging_debug("Accepting link request from " << remote->to_string() );
832 return true;
833}
834
835/// handles a message from base communication
836bool BaseOverlay::receiveMessage(const Message* message,
837 const LinkID& link, const NodeID& ) {
838 // get descriptor for link
839 LinkDescriptor* ld = getDescriptor( link, true );
840
841 // link known?
842 if (ld == NULL) { // no-> handle with unspecified params
843 logging_debug("Received message from base communication, link descriptor unknown" );
844 return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED );
845 } else { // yes -> handle with overlay link id
846 logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() );
847 return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED );
848 }
849}
850
851// ----------------------------------------------------------------------------
852
853/// handles a message from an overlay
854void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) {
855 logging_debug("Received message from overlay -- "
856 << " link id=" << link.toString()
857 << " node id=" << source.toString() );
858 handleMessage( msg, link, LinkID::UNSPECIFIED, source );
859}
860
861// ----------------------------------------------------------------------------
862
863/// handles an incoming message
864bool BaseOverlay::handleMessage( const Message* message,
865 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) {
866 logging_debug( "Handling message: " << message->toString());
867
868 // decapsulate overlay message
869 OverlayMsg* overlayMsg =
870 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
871 if( overlayMsg == NULL ) return false;
872
873 // mark the link as in action
874 LinkDescriptor* ld = getDescriptor(boLink);
875 if (ld == NULL) ld = getDescriptor(bcLink, true);
876 if (ld != NULL) {
877 ld->markAsUsed();
878 ld->markAlive();
879 }
880
881 switch ( overlayMsg->getType() ) {
882 // ---------------------------------------------------------------------
883 // Handle spovnet instance join requests
884 // ---------------------------------------------------------------------
885 case OverlayMsg::typeJoinRequest: {
886
887 // decapsulate message
888 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
889 logging_info( "Received join request for spovnet " <<
890 joinReq->getSpoVNetID().toString() );
891
892 // check spovnet id
893 if( joinReq->getSpoVNetID() != spovnetId ) {
894 logging_error(
895 "Received join request for spovnet we don't handle " <<
896 joinReq->getSpoVNetID().toString() );
897 return false;
898 }
899
900 // TODO: here you can implement mechanisms to deny joining of a node
901 bool allow = true;
902 logging_info( "Sending join reply for spovnet " <<
903 spovnetId.toString() << " to node " <<
904 overlayMsg->getSourceNode().toString() <<
905 ". Result: " << (allow ? "allowed" : "denied") );
906 joiningNodes.push_back( overlayMsg->getSourceNode() );
907
908 // return overlay parameters
909 assert( overlayInterface != NULL );
910 logging_debug( "Using bootstrap end-point "
911 << getEndpointDescriptor().toString() )
912 OverlayParameterSet parameters = overlayInterface->getParameters();
913 OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId );
914 JoinReply replyMsg( spovnetId, parameters,
915 allow, getEndpointDescriptor() );
916 retmsg.encapsulate(&replyMsg);
917 bc->sendMessage( bcLink, &retmsg );
918 return true;
919 }
920
921 // ---------------------------------------------------------------------
922 // handle replies to spovnet instance join requests
923 // ---------------------------------------------------------------------
924 case OverlayMsg::typeJoinReply: {
925
926 // decapsulate message
927 logging_debug("received join reply message");
928 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
929 assert(state == BaseOverlayStateJoinInitiated);
930
931 // correct spovnet?
932 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
933 logging_error( "Received SpoVNet join reply for " <<
934 replyMsg->getSpoVNetID().toString() <<
935 " != " << spovnetId.toString() );
936 return false;
937 }
938
939 // access granted? no -> fail
940 if( !replyMsg->getJoinAllowed() ) {
941 logging_error( "Our join request has been denied" );
942
943 // drop initiator link
944 bc->dropLink( initiatorLink );
945 initiatorLink = LinkID::UNSPECIFIED;
946 state = BaseOverlayStateInvalid;
947
948 // inform all registered services of the event
949 BOOST_FOREACH( NodeListener* i, nodeListeners )
950 i->onJoinFailed( spovnetId );
951 return true;
952 }
953
954 // access has been granted -> continue!
955 logging_info("Join request has been accepted for spovnet " <<
956 spovnetId.toString() );
957
958 // create overlay structure from spovnet parameter set
959 overlayInterface = OverlayFactory::create(
960 *this, replyMsg->getParam(), nodeId, this );
961
962 // overlay structure supported? no-> fail!
963 if( overlayInterface == NULL ) {
964 logging_error( "overlay structure not supported" );
965
966 bc->dropLink( initiatorLink );
967 initiatorLink = LinkID::UNSPECIFIED;
968 state = BaseOverlayStateInvalid;
969
970 // inform all registered services of the event
971 BOOST_FOREACH( NodeListener* i, nodeListeners )
972 i->onJoinFailed( spovnetId );
973
974 return true;
975 }
976
977 // everything ok-> join the overlay!
978 state = BaseOverlayStateCompleted;
979 overlayInterface->createOverlay();
980 logging_debug( "Using bootstrap end-point "
981 << replyMsg->getBootstrapEndpoint().toString() );
982 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
983
984 // update ovlvis
985 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
986
987 // inform all registered services of the event
988 BOOST_FOREACH( NodeListener* i, nodeListeners )
989 i->onJoinCompleted( spovnetId );
990
991 return true;
992 }
993
994 // ---------------------------------------------------------------------
995 // handle data forward messages
996 // ---------------------------------------------------------------------
997 case OverlayMsg::typeData: {
998
999 // get service
1000 const ServiceID& service = overlayMsg->getService();
1001 logging_debug( "received data for service " << service.toString() );
1002
1003 // find listener
1004 CommunicationListener* listener =
1005 communicationListeners.get( service );
1006 if( listener == NULL ) return true;
1007
1008 // delegate data message
1009 listener->onMessage( overlayMsg,
1010 overlayMsg->getSourceNode(), ld->overlayId );
1011
1012 return true;
1013 }
1014
1015 // ---------------------------------------------------------------------
1016 // handle update messages for link establishment
1017 // ---------------------------------------------------------------------
1018 case OverlayMsg::typeUpdate: {
1019 logging_debug("Received type update message on link " << ld );
1020
1021 // get info
1022 const NodeID& sourcenode = overlayMsg->getSourceNode();
1023 const ServiceID& service = overlayMsg->getService();
1024
1025 // no link descriptor available -> error!
1026 if( ld == NULL ) {
1027 logging_warn( "received overlay update message for link " <<
1028 ld->overlayId.toString() << " for which we have no mapping" );
1029 return false;
1030 }
1031
1032 // update our link mapping information for this link
1033 bool changed =
1034 ( ld->remoteNode != sourcenode ) || ( ld->service != service );
1035 ld->remoteNode = sourcenode;
1036 ld->service = service;
1037 ld->autolink = overlayMsg->isAutoLink();
1038
1039 // if our link information changed, we send out an update, too
1040 if( changed ) {
1041 OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
1042 overMsg.setAutoLink(ld->autolink);
1043 bc->sendMessage( ld->communicationId, &overMsg );
1044 }
1045
1046 // service registered? no-> error!
1047 if( !communicationListeners.contains( service ) ) {
1048 logging_warn( "Link up: event listener has not been registered" );
1049 return false;
1050 }
1051
1052 // default or no service registered?
1053 CommunicationListener* listener = communicationListeners.get( service );
1054 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1055 logging_warn("Link up: event listener is default or null!" );
1056 return true;
1057 }
1058
1059 // update descriptor
1060 ld->listener = listener;
1061 ld->markAsUsed();
1062 ld->markAlive();
1063
1064 // ask the service whether it wants to accept this link
1065 if( !listener->onLinkRequest(sourcenode) ) {
1066
1067 logging_debug("Link id=" << ld->overlayId.toString() <<
1068 " has been denied by service " << service.toString() << ", dropping link");
1069
1070 // prevent onLinkDown calls to the service
1071 ld->listener = &CommunicationListener::DEFAULT;
1072
1073 // drop the link
1074 dropLink( ld->overlayId );
1075 return true;
1076 }
1077
1078 // set link up
1079 ld->up = true;
1080 logging_debug(
1081 "Link " << ld->overlayId.toString()
1082 << " has been accepted by service " << service.toString()
1083 << " and is now up"
1084 );
1085
1086 // auto links: link has been accepted -> send queued messages
1087 if( ld->messageQueue.size() > 0 ) {
1088 logging_info( "sending out queued messages on link " <<
1089 ld->overlayId.toString() );
1090 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1091 sendMessage( msg, ld->overlayId );
1092 delete msg;
1093 }
1094 ld->messageQueue.clear();
1095 }
1096
1097 // call the notification functions
1098 listener->onLinkUp( ld->overlayId, sourcenode );
1099 sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId );
1100
1101 return true;
1102 }
1103
1104 // ---------------------------------------------------------------------
1105 // handle bye messages
1106 // ---------------------------------------------------------------------
1107 case OverlayMsg::typeBye: {
1108 logging_debug( "received bye message from " <<
1109 overlayMsg->getSourceNode().toString() );
1110
1111 /* if we are the initiator and receive a bye from a node
1112 * the node just left. if we are a node and receive a bye
1113 * from the initiator, we have to close, too.
1114 */
1115 if( overlayMsg->getSourceNode() == spovnetInitiator ) {
1116
1117 bc->dropLink( initiatorLink );
1118 initiatorLink = LinkID::UNSPECIFIED;
1119 state = BaseOverlayStateInvalid;
1120
1121 logging_fatal( "initiator ended spovnet" );
1122
1123 // inform all registered services of the event
1124 BOOST_FOREACH( NodeListener* i, nodeListeners )
1125 i->onLeaveFailed( spovnetId );
1126
1127 } else {
1128 // a node that said goodbye and we are the initiator don't have to
1129 // do much here, as the node also will go out of the overlay
1130 // structure
1131 logging_info( "node left " << overlayMsg->getSourceNode() );
1132 }
1133
1134 return true;
1135
1136 }
1137
1138 // ---------------------------------------------------------------------
1139 // handle link request forwarded through the overlay
1140 // ---------------------------------------------------------------------
1141 case OverlayMsg::typeLinkRequest: {
1142
1143 // decapsulate message
1144 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
1145 const ServiceID& service = overlayMsg->getService();
1146
1147 // is request reply?
1148 if ( linkReq->isReply() ) {
1149
1150 // find link
1151 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
1152 if ( i == pendingLinks.end() ) {
1153 logging_error( "Nonce not found in link request" );
1154 return true;
1155 }
1156
1157 // debug message
1158 logging_debug( "Link request reply received. Establishing link "
1159 << i->second << " to " << (linkReq->getEndpoint()->toString())
1160 << " for service " << service.toString()
1161 << " with nonce " << linkReq->getNonce()
1162 << " using relay " << linkReq->getRelay().toString()
1163 << " and remote link id=" << linkReq->getRemoteLinkId()
1164 );
1165
1166 // get descriptor
1167 LinkDescriptor* ldn = getDescriptor(i->second);
1168
1169 // check if link request reply has a relay node ...
1170 if (!linkReq->getRelay().isUnspecified()) { // yes->
1171 ldn->up = true;
1172 ldn->relay = true;
1173 if (ldn->localRelay.isUnspecified()) {
1174 logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn );
1175 showLinkState();
1176 }
1177 ldn->remoteRelay = linkReq->getRelay();
1178 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1179 ldn->remoteNode = overlayMsg->getSourceNode();
1180
1181 ldn->markAlive();
1182
1183 // compile and send update message
1184 OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId );
1185 _overlayMsg.setAutoLink(ldn->autolink);
1186 sendMessage( &_overlayMsg, ldn );
1187
1188 // auto links: link has been accepted -> send queued messages
1189 if( ldn->messageQueue.size() > 0 ) {
1190 logging_info( "Sending out queued messages on link " <<
1191 ldn->overlayId.toString() );
1192 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1193 sendMessage( msg, ldn->overlayId );
1194 delete msg;
1195 }
1196 ldn->messageQueue.clear();
1197 }
1198
1199 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1200
1201 // try to establish a direct link
1202 ldn->communicationId =
1203 bc->establishLink( *linkReq->getEndpoint(), i->second );
1204 }
1205
1206 // no relay node-> use overlay routing
1207 else {
1208 ldn->up = true;
1209
1210 // establish direct link
1211 ldn->communicationId =
1212 bc->establishLink( *linkReq->getEndpoint(), i->second );
1213 }
1214 } else {
1215 logging_debug( "Link request received from node id="
1216 << overlayMsg->getSourceNode() );
1217
1218 // create link descriptor
1219 LinkDescriptor* ldn =
1220 createLinkDescriptor(overlayMsg->getSourceNode(),
1221 overlayMsg->getService(), LinkID::UNSPECIFIED );
1222 assert(!ldn->overlayId.isUnspecified());
1223
1224 // create reply message
1225 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
1226 LinkRequest link_request_msg(
1227 linkReq->getNonce(),
1228 &bc->getEndpointDescriptor(),
1229 true, ldn->overlayId, ldn->localRelay
1230 );
1231 overlay_msg.encapsulate( &link_request_msg );
1232
1233 // debug message
1234 logging_debug( "Sending LinkRequest reply for link with nonce " <<
1235 linkReq->getNonce() );
1236
1237 // if this is a relay link-> update information & inform listeners
1238 if (!linkReq->getRelay().isUnspecified()) {
1239 // set flags
1240 ldn->up = true;
1241 ldn->relay = true;
1242 if (ldn->localRelay.isUnspecified()) {
1243 logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn );
1244 showLinkState();
1245 }
1246 ldn->remoteRelay = linkReq->getRelay();
1247 ldn->remoteNode = overlayMsg->getSourceNode();
1248 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1249 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1250 }
1251
1252 // route message back over overlay
1253 sendMessage( &overlay_msg, ldn );
1254 }
1255 return true;
1256 }
1257
1258 // ---------------------------------------------------------------------
1259 // handle relay message to forward messages
1260 // ---------------------------------------------------------------------
1261 case OverlayMsg::typeRelay: {
1262
1263 // decapsulate message
1264 RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>();
1265
1266 // is relay message informative?
1267 switch (relayMsg->getType()) {
1268
1269 // handle relay notification
1270 case RelayMessage::typeInform: {
1271 logging_info("Received relay information message with"
1272 << " relay " << relayMsg->getRelayNode()
1273 << " destination " << relayMsg->getDestNode() );
1274
1275 // mark incoming link as relay
1276 if (ld!=NULL) ld->markAsRelay();
1277
1278 // am I the destination of this message? yes->
1279 if (relayMsg->getDestNode() == nodeId ) {
1280 // deliver relay message locally!
1281 logging_debug("Relay message reached destination. Handling the message.");
1282 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1283 return true;
1284 }
1285
1286 // create route message
1287 OverlayMsg _overMsg( *overlayMsg );
1288 RelayMessage _relayMsg( *relayMsg );
1289 _relayMsg.setType( RelayMessage::typeRoute );
1290 _overMsg.encapsulate( &_relayMsg );
1291
1292 // forward message
1293 if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) {
1294 logging_info("Routing relay message to " << relayMsg->getDestNode().toString() );
1295 overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg );
1296 } else {
1297 logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() );
1298 overlayInterface->routeMessage(relayMsg->getRelayNode(), &_overMsg );
1299 }
1300 return true;
1301 }
1302
1303 // handle relay routing
1304 case RelayMessage::typeRoute: {
1305 logging_info("Received relay route message with"
1306 << " relay " << relayMsg->getRelayNode()
1307 << " destination " << relayMsg->getDestNode() );
1308
1309 // mark incoming link as relay
1310 if (ld!=NULL) ld->markAsRelay();
1311
1312 // am I the destination of this message? yes->
1313 if (relayMsg->getDestNode() == nodeId ) {
1314 // deliver relay message locally!
1315 logging_debug("Relay message reached destination. Handling the message.");
1316 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1317 return true;
1318 }
1319
1320 // am I the relay for this message? yes->
1321 if (relayMsg->getRelayNode() == nodeId ) {
1322 logging_debug("I'm the relay for this message. Sending to destination.");
1323 OverlayMsg _overMsg( *overlayMsg );
1324 RelayMessage _relayMsg( *relayMsg );
1325 _overMsg.encapsulate(&_relayMsg);
1326
1327 /// this must be handled by using relay link!
1328 overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg );
1329 return true;
1330 }
1331
1332 // error: I'm not a relay or destination!
1333 logging_error("This node is neither relay nor destination. Dropping Message!");
1334 return true;
1335 }
1336 default: {
1337 logging_error("RelayMessage Unknown!");
1338 return true;
1339 }
1340 }
1341
1342 break;
1343 }
1344
1345 // ---------------------------------------------------------------------
1346 // handle keep-alive messages
1347 // ---------------------------------------------------------------------
1348 case OverlayMsg::typeKeepAlive: {
1349 if ( ld != NULL ) {
1350 //logging_force("Keep-Alive for "<< ld->overlayId);
1351 ld->markAlive();
1352 }
1353 break;
1354 }
1355
1356 // ---------------------------------------------------------------------
1357 // handle direct link replacement messages
1358 // ---------------------------------------------------------------------
1359 case OverlayMsg::typeDirectLink: {
1360 LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() );
1361 logging_force( "Received direct link convert notification for " << rld );
1362 rld->communicationId = ld->communicationId;
1363 rld->communicationUp = true;
1364 rld->relay = false;
1365 rld->localRelay = NodeID::UNSPECIFIED;
1366 rld->remoteRelay = NodeID::UNSPECIFIED;
1367 eraseDescriptor(ld->overlayId);
1368 break;
1369 }
1370
1371 // ---------------------------------------------------------------------
1372 // handle unknown message type
1373 // ---------------------------------------------------------------------
1374 default: {
1375 logging_error( "received message in invalid state! don't know " <<
1376 "what to do with this message of type " <<
1377 overlayMsg->getType() );
1378 return false;
1379 }
1380
1381 } /* switch */
1382 return false;
1383}
1384
1385// ----------------------------------------------------------------------------
1386
1387void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1388
1389 logging_debug( "broadcasting message to all known nodes " <<
1390 "in the overlay from service " + service.toString() );
1391
1392 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes();
1393 OverlayInterface::NodeList::iterator i = nodes.begin();
1394 for(; i != nodes.end(); i++ ) {
1395 if( *i == nodeId) continue; // don't send to ourselfs
1396 sendMessage( message, *i, service );
1397 }
1398}
1399
1400vector<NodeID> BaseOverlay::getOverlayNeighbors() const {
1401 // the known nodes _can_ also include our node, so we remove ourself
1402 vector<NodeID> nodes = overlayInterface->getKnownNodes();
1403 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1404 if( i != nodes.end() ) nodes.erase( i );
1405 return nodes;
1406}
1407
1408const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1409 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1410 const LinkDescriptor* ld = getDescriptor(lid);
1411 if( ld == NULL ) return NodeID::UNSPECIFIED;
1412 else return ld->remoteNode;
1413}
1414
1415vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1416 vector<LinkID> linkvector;
1417 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1418 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1419 linkvector.push_back( ld->overlayId );
1420 }
1421 }
1422 return linkvector;
1423}
1424
1425
1426void BaseOverlay::onNodeJoin(const NodeID& node) {
1427 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1428 if( i == joiningNodes.end() ) return;
1429
1430 logging_info( "node has successfully joined baseoverlay and overlay structure "
1431 << node.toString() );
1432
1433 joiningNodes.erase( i );
1434}
1435
1436void BaseOverlay::eventFunction() {
1437
1438 // send keep-alive messages over established links
1439 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1440 if (!ld->up) continue;
1441 OverlayMsg overMsg( OverlayMsg::typeKeepAlive,
1442 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1443 sendMessage( &overMsg, ld );
1444 }
1445
1446 // iterate over all links and check for time boundaries
1447 vector<LinkDescriptor*> oldlinks;
1448 time_t now = time(NULL);
1449 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1450 // remote used as relay flag
1451 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)
1452 ld->usedAsRelay = false;
1453
1454 // keep alives missed? yes->
1455 if ( !ld->up && difftime( now, ld->keepAliveTime ) > 2 ) {
1456
1457 // increase counter
1458 ld->keepAliveMissed++;
1459
1460 // missed more than four keep-alive messages (4 sec)? -> drop link
1461 if (ld->keepAliveMissed > 10) {
1462 logging_force( "Link connection request is stale, closing: " << ld );
1463 oldlinks.push_back( ld );
1464 }
1465 }
1466
1467 if (!ld->up) continue;
1468
1469 // drop links that are dropped and not used as relay
1470 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink)
1471 oldlinks.push_back( ld );
1472 else
1473
1474 // auto-link time exceeded?
1475 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 )
1476 oldlinks.push_back( ld );
1477
1478 else
1479
1480 // keep alives missed? yes->
1481 if ( !ld->autolink && difftime( now, ld->keepAliveTime ) > 2 ) {
1482
1483 // increase counter
1484 ld->keepAliveMissed++;
1485
1486 // missed more than four keep-alive messages (4 sec)? -> drop link
1487 if (ld->keepAliveMissed >= 8) {
1488 logging_force( "Link is stale, closing: " << ld );
1489 oldlinks.push_back( ld );
1490 }
1491 }
1492 }
1493
1494 // show link state
1495 counter++;
1496 if (counter>=4) showLinkState();
1497 if (counter>=4 || counter<0) counter = 0;
1498
1499 // drop links
1500 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
1501 if (!ld->communicationId.isUnspecified() && ld->communicationId == initiatorLink) {
1502 logging_force( "Not dropping initiator link: " << ld );
1503 continue;
1504 }
1505 logging_force( "Link timed out. Dropping " << ld );
1506 dropLink( ld->overlayId );
1507 }
1508}
1509
1510void BaseOverlay::showLinkState() {
1511 int i=0;
1512 logging_force("--- link state -------------------------------");
1513 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1514 logging_force("link " << i << ": " << ld);
1515 i++;
1516 }
1517 logging_force("----------------------------------------------");
1518}
1519
1520}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.