close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/overlay/BaseOverlay.cpp@ 5274

Last change on this file since 5274 was 5274, checked in by Christoph Mayer, 15 years ago

basti relay fix

File size: 47.8 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51#include "ariba/overlay/messages/OverlayMsg.h"
52#include "ariba/overlay/messages/JoinRequest.h"
53#include "ariba/overlay/messages/JoinReply.h"
54#include "ariba/overlay/messages/LinkRequest.h"
55#include "ariba/overlay/messages/RelayMessage.h"
56
57#include "ariba/utility/misc/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62#define logging_force(x) std::cout << x << std::endl;
63#define logging_force1(x) std::cout << x << std::endl;
64
65LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
66 BOOST_FOREACH( LinkDescriptor* lp, links )
67 if ((communication ? lp->communicationId : lp->overlayId) == link)
68 return lp;
69 return NULL;
70}
71
72const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
73 BOOST_FOREACH( const LinkDescriptor* lp, links )
74 if ((communication ? lp->communicationId : lp->overlayId) == link)
75 return lp;
76 return NULL;
77}
78
79LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
80 BOOST_FOREACH( LinkDescriptor* lp, links )
81 if (lp->autolink && lp->remoteNode == node && lp->service == service)
82 return lp;
83 return NULL;
84}
85
86void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
87 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
88 LinkDescriptor* ld = *i;
89 if ((communication ? ld->communicationId : ld->overlayId) == link) {
90 delete ld;
91 links.erase(i);
92 break;
93 }
94 }
95}
96
97LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
98 LinkDescriptor* desc = getDescriptor( link );
99 if ( desc == NULL ) {
100 desc = new LinkDescriptor();
101 desc->overlayId = link;
102 links.push_back(desc);
103 }
104 return desc;
105}
106
107/// returns a direct link relay descriptor to the given relay node
108LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& relayNode ) {
109 BOOST_FOREACH( LinkDescriptor* lp, links )
110 if (lp->remoteNode == relayNode &&
111 lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&
112 lp->relay == false &&
113 lp->up)
114 return lp;
115 return NULL;
116}
117
118/// find a proper relay node
119const NodeID BaseOverlay::findRelayNode( const NodeID& id ) {
120 LinkDescriptor* rld = NULL;
121 NodeID relayNode = NodeID::UNSPECIFIED;
122
123 // get used next hop towards node
124 LinkID rlid = overlayInterface->getNextLinkId(id);
125 while ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {
126
127 // get descriptor of first hop
128 rld = getDescriptor(rlid);
129 logging_force1( rld );
130
131 // is first hop a relay path? yes-> try to find real link!
132 if ( rld->relay )
133 relayNode = getRelayDescriptor(rld->localRelay)->remoteNode;
134
135 // no-> a proper relay node has been found
136 else relayNode = rld->remoteNode;
137 }
138 logging_force1( "Potential relay node " << relayNode.toString() );
139 // do not return myself or use the node as relay node
140 if (relayNode == nodeId)
141 return NodeID::UNSPECIFIED;
142 else {
143 logging_force1( "Returning relay node " << relayNode.toString() );
144 return relayNode;
145 }
146}
147
148/// forwards a message over relays/overlay/directly using link descriptor
149seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) {
150
151 // directly send message
152 if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) {
153 logging_debug("sendMessage: Sending message via Base Communication");
154 return bc->sendMessage( ld->communicationId, message );
155 }
156
157 // relay message
158 else if ( ld->relay ) {
159 logging_debug("sendMessage: Relaying message to node "
160 << ld->remoteNode.toString()
161 << " using relay " << ld->localRelay
162 );
163
164 // get local relay link descriptor and mark as used for relaying
165 LinkDescriptor* rld = getRelayDescriptor(ld->localRelay);
166 if (rld==NULL) {
167 logging_error("sendMessage: Relay descriptor for relay " <<
168 ld->localRelay.toString() << " unknown.");
169 return -1;
170 }
171 rld->markAsRelay();
172
173 // create a information relay message to inform the relay about
174 OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId );
175 RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId );
176 relayMsg.encapsulate( message );
177 overlay_msg.encapsulate( &relayMsg );
178
179 // route message to relay node in order to inform it!
180 logging_debug("sendMessage: Sending message over relayed link with" << ld );
181 overlayInterface->routeMessage(rld->remoteNode, rld->overlayId, &overlay_msg);
182 return 0;
183 }
184
185 // route message using overlay
186 else {
187 logging_error("Could not send message");
188 logging_debug( "sendMessage: Routing message to node " << ld->remoteNode.toString() );
189 overlayInterface->routeMessage( ld->remoteNode, message );
190 return 0;
191 }
192
193 return -1;
194}
195
196/// creates a link descriptor, apply relay semantics if possible
197LinkDescriptor* BaseOverlay::createLinkDescriptor(
198 const NodeID& remoteNode, const ServiceID& service, const LinkID& link_id ) {
199
200 // find listener
201 if( !communicationListeners.contains( service ) ) {
202 logging_error( "No listener found for service " << service.toString() );
203 return NULL;
204 }
205 CommunicationListener* listener = communicationListeners.get( service );
206 assert( listener != NULL );
207
208 // copy link id
209 LinkID linkid = link_id;
210
211 // create link id if necessary
212 if ( linkid.isUnspecified() )
213 linkid = LinkID::create();
214
215 // create relay link descriptor
216 NodeID relayNode = findRelayNode(remoteNode);
217
218 // add descriptor
219 LinkDescriptor* ld = addDescriptor( linkid );
220 ld->overlayId = linkid;
221 ld->service = service;
222 ld->listener = listener;
223 ld->remoteNode = remoteNode;
224
225 // set relay node if available
226 ld->relay = !relayNode.isUnspecified();
227 ld->localRelay = relayNode;
228
229 if (!ld->relay)
230 logging_error("No relay found!");
231
232 // debug output
233 logging_debug( "Created link descriptor: " << ld );
234
235 return ld;
236}
237
238
239// ----------------------------------------------------------------------------
240
241use_logging_cpp(BaseOverlay);
242
243// ----------------------------------------------------------------------------
244
245BaseOverlay::BaseOverlay() :
246 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
247 spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED),
248 state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT) {
249}
250
251BaseOverlay::~BaseOverlay() {
252}
253
254// ----------------------------------------------------------------------------
255
256void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
257 logging_info("Starting...");
258
259 // set parameters
260 bc = &_basecomm;
261 nodeId = _nodeid;
262
263 // register at base communication
264 bc->registerMessageReceiver( this );
265 bc->registerEventListener( this );
266
267 // timer for auto link management
268 Timer::setInterval( 500 );
269 Timer::start();
270}
271
272void BaseOverlay::stop() {
273 logging_info("Stopping...");
274
275 // stop timer
276 Timer::stop();
277
278 // delete oberlay interface
279 if(overlayInterface != NULL) {
280 delete overlayInterface;
281 overlayInterface = NULL;
282 }
283
284 // unregister at base communication
285 bc->unregisterMessageReceiver( this );
286 bc->unregisterEventListener( this );
287}
288
289// ----------------------------------------------------------------------------
290
291void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
292 const EndpointDescriptor& bootstrapEp) {
293
294 ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
295 logging_info( "Starting to join spovnet " << id.toString() <<
296 " with nodeid " << nodeId.toString());
297
298 // contact the spovnet initiator and request to join. if the join is granted we will
299 // receive further information on the structure of the overlay that is used in the spovnet
300 // but first, we have to establish a link to the initiator...
301 spovnetId = id;
302 state = BaseOverlayStateJoinInitiated;
303
304 initiatorLink = bc->establishLink( bootstrapEp );
305 logging_info("join process initiated for " << id.toString() << "...");
306}
307
308void BaseOverlay::leaveSpoVNet() {
309
310 logging_info( "Leaving spovnet " << spovnetId );
311 bool ret = ( state != this->BaseOverlayStateInvalid );
312
313 logging_debug( "Dropping all auto-links" );
314
315 // gather all service links
316 vector<LinkID> servicelinks;
317 BOOST_FOREACH( LinkDescriptor* ld, links ) {
318 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
319 servicelinks.push_back( ld->overlayId );
320 }
321
322 // drop all service links
323 BOOST_FOREACH( LinkID lnk, servicelinks )
324 dropLink( lnk );
325
326 // let the node leave the spovnet overlay interface
327 logging_debug( "Leaving overlay" );
328 if( overlayInterface != NULL )
329 overlayInterface->leaveOverlay();
330
331 // leave spovnet
332 if( state != BaseOverlayStateInitiator ) {
333 // then, leave the spovnet baseoverlay
334 OverlayMsg overMsg( OverlayMsg::typeBye, nodeId );
335 bc->sendMessage( initiatorLink, &overMsg );
336
337 // drop the link and set to correct state
338 bc->dropLink( initiatorLink );
339 initiatorLink = LinkID::UNSPECIFIED;
340 }
341
342 // change to inalid state
343 state = BaseOverlayStateInvalid;
344 ovl.visShutdown( ovlId, nodeId, string("") );
345
346 // inform all registered services of the event
347 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
348 if( ret ) i->onLeaveCompleted( spovnetId );
349 else i->onLeaveFailed( spovnetId );
350 }
351}
352
353void BaseOverlay::createSpoVNet(const SpoVNetID& id,
354 const OverlayParameterSet& param,
355 const SecurityParameterSet& sec,
356 const QoSParameterSet& qos) {
357
358 // set the state that we are an initiator, this way incoming messages are
359 // handled correctly
360 logging_info( "creating spovnet " + id.toString() <<
361 " with nodeid " << nodeId.toString() );
362
363 spovnetId = id;
364 state = BaseOverlayStateInitiator;
365
366 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
367 if( overlayInterface == NULL ) {
368 logging_fatal( "overlay structure not supported" );
369 state = BaseOverlayStateInvalid;
370 return;
371 }
372
373 // bootstrap against ourselfs
374 overlayInterface->joinOverlay();
375 BOOST_FOREACH( NodeListener* i, nodeListeners )
376 i->onJoinCompleted( spovnetId );
377
378 ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
379 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
380}
381
382// ----------------------------------------------------------------------------
383
384const LinkID BaseOverlay::establishLink(
385 const EndpointDescriptor& ep, const NodeID& nodeid,
386 const ServiceID& service, const LinkID& linkid ) {
387
388 LinkID link_id = linkid;
389
390 // establish link via overlay
391 if (!nodeid.isUnspecified())
392 link_id = establishLink( nodeid, service, link_id );
393
394 // establish link directly if only ep is known
395 if (nodeid.isUnspecified())
396 establishLink( ep, service, link_id );
397
398 return link_id;
399}
400
401/// call base communication's establish link and add link mapping
402const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep,
403 const ServiceID& service, const LinkID& linkid ) {
404
405 // create a new link id if necessary
406 LinkID link_id = linkid;
407 if (link_id.isUnspecified()) link_id = LinkID::create();
408
409 /// find a service listener
410 if( !communicationListeners.contains( service ) ) {
411 logging_error( "No listener registered for service id=" << service.toString() );
412 return LinkID::UNSPECIFIED;
413 }
414 CommunicationListener* listener = communicationListeners.get( service );
415 assert( listener != NULL );
416
417 /// establish link and add mapping
418 logging_info("Establishing direct link " << link_id.toString()
419 << " using " << ep.toString());
420
421 // create descriptor
422 LinkDescriptor* ld = addDescriptor( link_id );
423 ld->overlayId = link_id;
424 ld->communicationId = link_id;
425 ld->listener = listener;
426 ld->service = service;
427 bc->establishLink( ep, link_id );
428
429 return link_id;
430}
431
432/// establishes a link between two arbitrary nodes
433const LinkID BaseOverlay::establishLink( const NodeID& node,
434 const ServiceID& service, const LinkID& link_id ) {
435
436 // do not establish a link to myself!
437 if (node == nodeId) return LinkID::UNSPECIFIED;
438
439 // create a link descriptor
440 LinkDescriptor* ld = createLinkDescriptor( node, service, link_id );
441
442 // create link request message with own link id
443 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
444 LinkRequest link_request_msg(
445 nonce, &bc->getEndpointDescriptor(), false,
446 ld->overlayId, ld->localRelay );
447 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
448 overlay_msg.encapsulate( &link_request_msg );
449 pendingLinks.insert( make_pair(nonce, ld->overlayId) );
450
451 // debug message
452 logging_debug(
453 "Sending link request with"
454 << " link id=" << ld->overlayId
455 << " node id=" << ld->remoteNode.toString()
456 << " service id=" << ld->service.toString()
457 << " local relay id=" << ld->localRelay.toString()
458 << " nonce= " << nonce
459 );
460
461 // sending message through new link
462 sendMessage( &overlay_msg, ld );
463
464 return ld->overlayId;
465}
466
467/// drops an established link
468void BaseOverlay::dropLink(const LinkID& link) {
469 logging_debug( "Dropping link (initiated locally):" << link.toString() );
470
471 // find the link item to drop
472 LinkDescriptor* ld = getDescriptor(link);
473 if( ld == NULL ) {
474 logging_warn( "Can't drop link, link is unknown!");
475 return;
476 }
477
478 // delete all queued messages
479 if( ld->messageQueue.size() > 0 ) {
480 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
481 << ld->messageQueue.size() << " waiting messages" );
482 ld->flushQueue();
483 }
484
485 // inform sideport and listener
486 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
487 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
488
489 // do not drop relay links
490 if (!ld->usedAsRelay) {
491 // drop the link in base communication
492 if (ld->communicationUp) bc->dropLink( ld->communicationId );
493
494 // erase descriptor
495 eraseDescriptor( ld->overlayId );
496 } else
497 ld->dropWhenRelaysLeft = true;
498}
499
500// ----------------------------------------------------------------------------
501
502/// internal send message, always use this functions to send messages over links
503seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
504 logging_debug( "Sending data message on link " << link.toString() );
505
506 // get the mapping for this link
507 LinkDescriptor* ld = getDescriptor(link);
508 if( ld == NULL ) {
509 logging_error("Could not send message. "
510 << "Link not found id=" << link.toString());
511 return -1;
512 }
513
514 // check if the link is up yet, if its an auto link queue message
515 if( !ld->up ) {
516 ld->markAsUsed();
517 if( ld->autolink ) {
518 logging_info("Auto-link " << link.toString() << " not up, queue message");
519 Data data = data_serialize( message );
520 const_cast<Message*>(message)->dropPayload();
521 ld->messageQueue.push_back( new Message(data) );
522 } else {
523 logging_error("Link " << link.toString() << " not up, drop message");
524 }
525 return -1;
526 }
527
528 // compile overlay message (has service and node id)
529 OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId );
530 overmsg.encapsulate( const_cast<Message*>(message) );
531
532 // send message over relay/direct/overlay
533 return sendMessage( &overmsg, ld );
534}
535
536seqnum_t BaseOverlay::sendMessage(const Message* message,
537 const NodeID& node, const ServiceID& service) {
538
539 // find link for node and service
540 LinkDescriptor* ld = getAutoDescriptor( node, service );
541
542 // if we found no link, create an auto link
543 if( ld == NULL ) {
544
545 // debug output
546 logging_info( "No link to send message to node "
547 << node.toString() << " found for service "
548 << service.toString() << ". Creating auto link ..."
549 );
550
551 // this will call onlinkup on us, if everything worked we now have a mapping
552 LinkID link = LinkID::create();
553
554 // call base overlay to create a link
555 link = establishLink( node, service, link );
556 ld = getDescriptor( link );
557 if( ld == NULL ) {
558 logging_error( "Failed to establish auto-link.");
559 return -1;
560 }
561 ld->autolink = true;
562
563 logging_debug( "Auto-link establishment in progress to node "
564 << node.toString() << " with link id=" << link.toString() );
565 }
566 assert(ld != NULL);
567
568 // mark the link as used, as we now send a message through it
569 ld->markAsUsed();
570
571 // send / queue message
572 return sendMessage( message, ld->overlayId );
573}
574
575// ----------------------------------------------------------------------------
576
577const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
578 const LinkID& link) const {
579
580 // return own end-point descriptor
581 if( link == LinkID::UNSPECIFIED )
582 return bc->getEndpointDescriptor();
583
584 // find link descriptor. not found -> return unspecified
585 const LinkDescriptor* ld = getDescriptor(link);
586 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED;
587
588 // return endpoint-descriptor from base communication
589 return bc->getEndpointDescriptor( ld->communicationId );
590}
591
592const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
593 const NodeID& node) const {
594
595 // return own end-point descriptor
596 if( node == nodeId || node == NodeID::UNSPECIFIED )
597 return bc->getEndpointDescriptor();
598
599 // no joined and request remote descriptor? -> fail!
600 if( overlayInterface == NULL ) {
601 logging_error( "overlay interface not set, cannot resolve endpoint" );
602 return EndpointDescriptor::UNSPECIFIED;
603 }
604
605 // resolve end-point descriptor from the base-overlay routing table
606 return overlayInterface->resolveNode( node );
607}
608
609// ----------------------------------------------------------------------------
610
611bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
612 sideport = _sideport;
613 _sideport->configure( this );
614}
615
616bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
617 sideport = &SideportListener::DEFAULT;
618}
619
620// ----------------------------------------------------------------------------
621
622bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
623 logging_debug( "binding communication listener " << listener
624 << " on serviceid " << sid.toString() );
625
626 if( communicationListeners.contains( sid ) ) {
627 logging_error( "some listener already registered for service id "
628 << sid.toString() );
629 return false;
630 }
631
632 communicationListeners.registerItem( listener, sid );
633 return true;
634}
635
636
637bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
638 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
639
640 if( !communicationListeners.contains( sid ) ) {
641 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
642 return false;
643 }
644
645 if( communicationListeners.get(sid) != listener ) {
646 logging_warn( "listener bound to service id " << sid.toString()
647 << " is different than listener trying to unbind" );
648 return false;
649 }
650
651 communicationListeners.unregisterItem( sid );
652 return true;
653}
654
655// ----------------------------------------------------------------------------
656
657bool BaseOverlay::bind(NodeListener* listener) {
658 logging_debug( "Binding node listener " << listener );
659
660 // already bound? yes-> warning
661 NodeListenerVector::iterator i =
662 find( nodeListeners.begin(), nodeListeners.end(), listener );
663 if( i != nodeListeners.end() ) {
664 logging_warn("Node listener " << listener << " is already bound!" );
665 return false;
666 }
667
668 // no-> add
669 nodeListeners.push_back( listener );
670 return true;
671}
672
673bool BaseOverlay::unbind(NodeListener* listener) {
674 logging_debug( "Unbinding node listener " << listener );
675
676 // already unbound? yes-> warning
677 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
678 if( i == nodeListeners.end() ) {
679 logging_warn( "Node listener " << listener << " is not bound!" );
680 return false;
681 }
682
683 // no-> remove
684 nodeListeners.erase( i );
685 return true;
686}
687
688// ----------------------------------------------------------------------------
689
690void BaseOverlay::onLinkUp(const LinkID& id,
691 const NetworkLocator* local, const NetworkLocator* remote) {
692 logging_debug( "Link up with base communication link id=" << id );
693
694 // get descriptor for link
695 LinkDescriptor* ld = getDescriptor(id, true);
696
697 // handle initiator link
698 if(state == BaseOverlayStateJoinInitiated && id == initiatorLink) {
699 logging_info(
700 "Join has been initiated by me and the link is now up. " <<
701 "Sending out join request for SpoVNet " << spovnetId.toString()
702 );
703
704 // send join request message
705 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId );
706 JoinRequest joinRequest( spovnetId, nodeId );
707 overlayMsg.encapsulate( &joinRequest );
708 bc->sendMessage( id, &overlayMsg );
709 return;
710 }
711
712 // no link found? -> link establishment from remote, add one!
713 if (ld == NULL) {
714 ld = addDescriptor( id );
715 logging_debug( "onLinkUp (remote request) descriptor: " << ld );
716
717 // update descriptor
718 ld->fromRemote = true;
719 ld->communicationId = id;
720 ld->communicationUp = true;
721 ld->markAsUsed();
722
723 // in this case, do not inform listener, since service it unknown
724 // -> wait for update message!
725
726 // link mapping found? -> send update message with node-id and service id
727 } else {
728 logging_debug( "onLinkUp descriptor (initiated locally):" << ld );
729
730 // note: necessary to validate the link on the remote side!
731 logging_debug( "Sending out update" <<
732 " for service " << ld->service.toString() <<
733 " with local node id " << nodeId.toString() <<
734 " on link " << ld->overlayId.toString() );
735
736 // update descriptor
737 ld->markAsUsed();
738 ld->communicationUp = true;
739
740 // if link is a relayed link ->convert to direct link
741 if (ld->relay) {
742 ld->up = true;
743 ld->relay = false;
744 ld->localRelay = NodeID::UNSPECIFIED;
745 OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId );
746 overMsg.setRelayLink( ld->remoteLinkId );
747 bc->sendMessage( ld->communicationId, &overMsg );
748 }
749
750 // compile and send update message
751 OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
752 overlayMsg.setAutoLink( ld->autolink );
753 bc->sendMessage( ld->communicationId, &overlayMsg );
754 }
755}
756
757void BaseOverlay::onLinkDown(const LinkID& id,
758 const NetworkLocator* local, const NetworkLocator* remote) {
759
760 // get descriptor for link
761 LinkDescriptor* ld = getDescriptor(id, true);
762 if ( ld == NULL ) return; // not found? ->ignore!
763 logging_force( "onLinkDown descriptor: " << ld );
764
765 // inform listeners about link down
766 ld->communicationUp = false;
767 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
768 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
769
770 // delete all queued messages (auto links)
771 if( ld->messageQueue.size() > 0 ) {
772 logging_warn( "Dropping link " << id.toString() << " that has "
773 << ld->messageQueue.size() << " waiting messages" );
774 ld->flushQueue();
775 }
776
777 // erase mapping
778 eraseDescriptor(ld->overlayId);
779}
780
781void BaseOverlay::onLinkChanged(const LinkID& id,
782 const NetworkLocator* oldlocal, const NetworkLocator* newlocal,
783 const NetworkLocator* oldremote, const NetworkLocator* newremote) {
784
785 // get descriptor for link
786 LinkDescriptor* ld = getDescriptor(id, true);
787 if ( ld == NULL ) return; // not found? ->ignore!
788 logging_debug( "onLinkChanged descriptor: " << ld );
789
790 // inform listeners
791 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
792 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
793
794 // autolinks: refresh timestamp
795 ld->markAsUsed();
796}
797
798void BaseOverlay::onLinkFail(const LinkID& id,
799 const NetworkLocator* local, const NetworkLocator* remote) {
800 logging_debug( "Link fail with base communication link id=" << id );
801
802 // get descriptor for link
803 LinkDescriptor* ld = getDescriptor(id, true);
804 if ( ld == NULL ) return; // not found? ->ignore!
805 logging_debug( "Link failed id=" << ld->overlayId.toString() );
806
807 // inform listeners
808 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
809 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
810
811 // autolinks: refresh timestamp
812 ld->markAsUsed();
813}
814
815void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local,
816 const NetworkLocator* remote, const QoSParameterSet& qos) {
817 logging_debug( "Link quality changed with base communication link id=" << id );
818
819 // get descriptor for link
820 LinkDescriptor* ld = getDescriptor(id, true);
821 if ( ld == NULL ) return; // not found? ->ignore!
822 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
823
824 // autolinks: refresh timestamp
825 ld->markAsUsed();
826}
827
828bool BaseOverlay::onLinkRequest( const LinkID& id, const NetworkLocator* local,
829 const NetworkLocator* remote ) {
830 logging_debug("Accepting link request from " << remote->toString() );
831 return true;
832}
833
834/// handles a message from base communication
835bool BaseOverlay::receiveMessage(const Message* message,
836 const LinkID& link, const NodeID& ) {
837 // get descriptor for link
838 LinkDescriptor* ld = getDescriptor( link, true );
839
840 // link known?
841 if (ld == NULL) { // no-> handle with unspecified params
842 logging_debug("Received message from base communication, link descriptor unknown" );
843 return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED );
844 } else { // yes -> handle with overlay link id
845 logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() );
846 return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED );
847 }
848}
849
850// ----------------------------------------------------------------------------
851
852/// handles a message from an overlay
853void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) {
854 logging_debug("Received message from overlay -- "
855 << " link id=" << link.toString()
856 << " node id=" << source.toString() );
857 handleMessage( msg, link, LinkID::UNSPECIFIED, source );
858}
859
860// ----------------------------------------------------------------------------
861
862/// handles an incoming message
863bool BaseOverlay::handleMessage( const Message* message,
864 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) {
865 logging_debug( "Handling message: " << message->toString());
866
867 // decapsulate overlay message
868 OverlayMsg* overlayMsg =
869 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
870 if( overlayMsg == NULL ) return false;
871
872 // mark the link as in action
873 LinkDescriptor* ld = getDescriptor(boLink);
874 if (ld == NULL) ld = getDescriptor(bcLink, true);
875 if (ld != NULL) {
876 ld->markAsUsed();
877 ld->markAlive();
878 }
879
880 switch ( overlayMsg->getType() ) {
881 // ---------------------------------------------------------------------
882 // Handle spovnet instance join requests
883 // ---------------------------------------------------------------------
884 case OverlayMsg::typeJoinRequest: {
885
886 // decapsulate message
887 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
888 logging_info( "Received join request for spovnet " <<
889 joinReq->getSpoVNetID().toString() );
890
891 // check spovnet id
892 if( joinReq->getSpoVNetID() != spovnetId ) {
893 logging_error(
894 "Received join request for spovnet we don't handle " <<
895 joinReq->getSpoVNetID().toString() );
896 return false;
897 }
898
899 // TODO: here you can implement mechanisms to deny joining of a node
900 bool allow = true;
901 logging_info( "Sending join reply for spovnet " <<
902 spovnetId.toString() << " to node " <<
903 overlayMsg->getSourceNode().toString() <<
904 ". Result: " << (allow ? "allowed" : "denied") );
905 joiningNodes.push_back( overlayMsg->getSourceNode() );
906
907 // return overlay parameters
908 assert( overlayInterface != NULL );
909 logging_debug( "Using bootstrap end-point "
910 << getEndpointDescriptor().toString() )
911 OverlayParameterSet parameters = overlayInterface->getParameters();
912 OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId );
913 JoinReply replyMsg( spovnetId, parameters,
914 allow, getEndpointDescriptor() );
915 retmsg.encapsulate(&replyMsg);
916 bc->sendMessage( bcLink, &retmsg );
917 return true;
918 }
919
920 // ---------------------------------------------------------------------
921 // handle replies to spovnet instance join requests
922 // ---------------------------------------------------------------------
923 case OverlayMsg::typeJoinReply: {
924
925 // decapsulate message
926 logging_debug("received join reply message");
927 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
928 assert(state == BaseOverlayStateJoinInitiated);
929
930 // correct spovnet?
931 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
932 logging_error( "Received SpoVNet join reply for " <<
933 replyMsg->getSpoVNetID().toString() <<
934 " != " << spovnetId.toString() );
935 return false;
936 }
937
938 // access granted? no -> fail
939 if( !replyMsg->getJoinAllowed() ) {
940 logging_error( "Our join request has been denied" );
941
942 // drop initiator link
943 bc->dropLink( initiatorLink );
944 initiatorLink = LinkID::UNSPECIFIED;
945 state = BaseOverlayStateInvalid;
946
947 // inform all registered services of the event
948 BOOST_FOREACH( NodeListener* i, nodeListeners )
949 i->onJoinFailed( spovnetId );
950 return true;
951 }
952
953 // access has been granted -> continue!
954 logging_info("Join request has been accepted for spovnet " <<
955 spovnetId.toString() );
956
957 // create overlay structure from spovnet parameter set
958 overlayInterface = OverlayFactory::create(
959 *this, replyMsg->getParam(), nodeId, this );
960
961 // overlay structure supported? no-> fail!
962 if( overlayInterface == NULL ) {
963 logging_error( "overlay structure not supported" );
964
965 bc->dropLink( initiatorLink );
966 initiatorLink = LinkID::UNSPECIFIED;
967 state = BaseOverlayStateInvalid;
968
969 // inform all registered services of the event
970 BOOST_FOREACH( NodeListener* i, nodeListeners )
971 i->onJoinFailed( spovnetId );
972
973 return true;
974 }
975
976 // everything ok-> join the overlay!
977 state = BaseOverlayStateCompleted;
978 overlayInterface->createOverlay();
979 logging_debug( "Using bootstrap end-point "
980 << replyMsg->getBootstrapEndpoint().toString() );
981 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
982
983 // update ovlvis
984 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
985
986 // inform all registered services of the event
987 BOOST_FOREACH( NodeListener* i, nodeListeners )
988 i->onJoinCompleted( spovnetId );
989
990 return true;
991 }
992
993 // ---------------------------------------------------------------------
994 // handle data forward messages
995 // ---------------------------------------------------------------------
996 case OverlayMsg::typeData: {
997
998 // get service
999 const ServiceID& service = overlayMsg->getService();
1000 logging_debug( "received data for service " << service.toString() );
1001
1002 // find listener
1003 CommunicationListener* listener =
1004 communicationListeners.get( service );
1005 if( listener == NULL ) return true;
1006
1007 // delegate data message
1008 listener->onMessage( overlayMsg,
1009 overlayMsg->getSourceNode(), ld->overlayId );
1010
1011 return true;
1012 }
1013
1014 // ---------------------------------------------------------------------
1015 // handle update messages for link establishment
1016 // ---------------------------------------------------------------------
1017 case OverlayMsg::typeUpdate: {
1018 logging_debug("Received type update message on link " << ld );
1019
1020 // get info
1021 const NodeID& sourcenode = overlayMsg->getSourceNode();
1022 const ServiceID& service = overlayMsg->getService();
1023
1024 // no link descriptor available -> error!
1025 if( ld == NULL ) {
1026 logging_warn( "received overlay update message for link " <<
1027 ld->overlayId.toString() << " for which we have no mapping" );
1028 return false;
1029 }
1030
1031 // update our link mapping information for this link
1032 bool changed =
1033 ( ld->remoteNode != sourcenode ) || ( ld->service != service );
1034 ld->remoteNode = sourcenode;
1035 ld->service = service;
1036 ld->autolink = overlayMsg->isAutoLink();
1037
1038 // if our link information changed, we send out an update, too
1039 if( changed ) {
1040 OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
1041 overMsg.setAutoLink(ld->autolink);
1042 bc->sendMessage( ld->communicationId, &overMsg );
1043 }
1044
1045 // service registered? no-> error!
1046 if( !communicationListeners.contains( service ) ) {
1047 logging_warn( "Link up: event listener has not been registered" );
1048 return false;
1049 }
1050
1051 // default or no service registered?
1052 CommunicationListener* listener = communicationListeners.get( service );
1053 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1054 logging_warn("Link up: event listener is default or null!" );
1055 return true;
1056 }
1057
1058 // update descriptor
1059 ld->listener = listener;
1060 ld->markAsUsed();
1061 ld->markAlive();
1062
1063 // ask the service whether it wants to accept this link
1064 if( !listener->onLinkRequest(sourcenode) ) {
1065
1066 logging_debug("Link id=" << ld->overlayId.toString() <<
1067 " has been denied by service " << service.toString() << ", dropping link");
1068
1069 // prevent onLinkDown calls to the service
1070 ld->listener = &CommunicationListener::DEFAULT;
1071
1072 // drop the link
1073 dropLink( ld->overlayId );
1074 return true;
1075 }
1076
1077 // set link up
1078 ld->up = true;
1079 logging_debug(
1080 "Link " << ld->overlayId.toString()
1081 << " has been accepted by service " << service.toString()
1082 << " and is now up"
1083 );
1084
1085 // auto links: link has been accepted -> send queued messages
1086 if( ld->messageQueue.size() > 0 ) {
1087 logging_info( "sending out queued messages on link " <<
1088 ld->overlayId.toString() );
1089 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1090 sendMessage( msg, ld->overlayId );
1091 delete msg;
1092 }
1093 ld->messageQueue.clear();
1094 }
1095
1096 // call the notification functions
1097 listener->onLinkUp( ld->overlayId, sourcenode );
1098 sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId );
1099
1100 return true;
1101 }
1102
1103 // ---------------------------------------------------------------------
1104 // handle bye messages
1105 // ---------------------------------------------------------------------
1106 case OverlayMsg::typeBye: {
1107 logging_debug( "received bye message from " <<
1108 overlayMsg->getSourceNode().toString() );
1109
1110 /* if we are the initiator and receive a bye from a node
1111 * the node just left. if we are a node and receive a bye
1112 * from the initiator, we have to close, too.
1113 */
1114 if( overlayMsg->getSourceNode() == spovnetInitiator ) {
1115
1116 bc->dropLink( initiatorLink );
1117 initiatorLink = LinkID::UNSPECIFIED;
1118 state = BaseOverlayStateInvalid;
1119
1120 logging_fatal( "initiator ended spovnet" );
1121
1122 // inform all registered services of the event
1123 BOOST_FOREACH( NodeListener* i, nodeListeners )
1124 i->onLeaveFailed( spovnetId );
1125
1126 } else {
1127 // a node that said goodbye and we are the initiator don't have to
1128 // do much here, as the node also will go out of the overlay
1129 // structure
1130 logging_info( "node left " << overlayMsg->getSourceNode() );
1131 }
1132
1133 return true;
1134
1135 }
1136
1137 // ---------------------------------------------------------------------
1138 // handle link request forwarded through the overlay
1139 // ---------------------------------------------------------------------
1140 case OverlayMsg::typeLinkRequest: {
1141
1142 // decapsulate message
1143 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
1144 const ServiceID& service = overlayMsg->getService();
1145
1146 // is request reply?
1147 if ( linkReq->isReply() ) {
1148
1149 // find link
1150 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
1151 if ( i == pendingLinks.end() ) {
1152 logging_error( "Nonce not found in link request" );
1153 return true;
1154 }
1155
1156 // debug message
1157 logging_debug( "Link request reply received. Establishing link "
1158 << i->second << " to " << (linkReq->getEndpoint()->toString())
1159 << " for service " << service.toString()
1160 << " with nonce " << linkReq->getNonce()
1161 << " using relay " << linkReq->getRelay().toString()
1162 << " and remote link id=" << linkReq->getRemoteLinkId()
1163 );
1164
1165 // get descriptor
1166 LinkDescriptor* ldn = getDescriptor(i->second);
1167
1168 // check if link request reply has a relay node ...
1169 if (!linkReq->getRelay().isUnspecified()) { // yes->
1170 ldn->up = true;
1171 ldn->relay = true;
1172 if (ldn->localRelay.isUnspecified()) {
1173 logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn );
1174 showLinkState();
1175 }
1176 ldn->remoteRelay = linkReq->getRelay();
1177 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1178 ldn->remoteNode = overlayMsg->getSourceNode();
1179
1180 ldn->markAlive();
1181
1182 // compile and send update message
1183 OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId );
1184 _overlayMsg.setAutoLink(ldn->autolink);
1185 sendMessage( &_overlayMsg, ldn );
1186
1187 // auto links: link has been accepted -> send queued messages
1188 if( ldn->messageQueue.size() > 0 ) {
1189 logging_info( "Sending out queued messages on link " <<
1190 ldn->overlayId.toString() );
1191 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1192 sendMessage( msg, ldn->overlayId );
1193 delete msg;
1194 }
1195 ldn->messageQueue.clear();
1196 }
1197
1198 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1199
1200 // try to establish a direct link
1201 ldn->communicationId =
1202 bc->establishLink( *linkReq->getEndpoint(), i->second );
1203 }
1204
1205 // no relay node-> use overlay routing
1206 else {
1207 ldn->up = true;
1208
1209 // establish direct link
1210 ldn->communicationId =
1211 bc->establishLink( *linkReq->getEndpoint(), i->second );
1212 }
1213 } else {
1214 logging_debug( "Link request received from node id="
1215 << overlayMsg->getSourceNode() );
1216
1217 // create link descriptor
1218 LinkDescriptor* ldn =
1219 createLinkDescriptor(overlayMsg->getSourceNode(),
1220 overlayMsg->getService(), LinkID::UNSPECIFIED );
1221 assert(!ldn->overlayId.isUnspecified());
1222
1223 // create reply message
1224 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
1225 LinkRequest link_request_msg(
1226 linkReq->getNonce(),
1227 &bc->getEndpointDescriptor(),
1228 true, ldn->overlayId, ldn->localRelay
1229 );
1230 overlay_msg.encapsulate( &link_request_msg );
1231
1232 // debug message
1233 logging_debug( "Sending LinkRequest reply for link with nonce " <<
1234 linkReq->getNonce() );
1235
1236 // if this is a relay link-> update information & inform listeners
1237 if (!linkReq->getRelay().isUnspecified()) {
1238 // set flags
1239 ldn->up = true;
1240 ldn->relay = true;
1241 if (ldn->localRelay.isUnspecified()) {
1242 logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn );
1243 showLinkState();
1244 }
1245 ldn->remoteRelay = linkReq->getRelay();
1246 ldn->remoteNode = overlayMsg->getSourceNode();
1247 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1248 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1249 }
1250
1251 // route message back over overlay
1252 sendMessage( &overlay_msg, ldn );
1253 }
1254 return true;
1255 }
1256
1257 // ---------------------------------------------------------------------
1258 // handle relay message to forward messages
1259 // ---------------------------------------------------------------------
1260 case OverlayMsg::typeRelay: {
1261
1262 // decapsulate message
1263 RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>();
1264
1265 // is relay message informative?
1266 switch (relayMsg->getType()) {
1267
1268 // handle relay notification
1269 case RelayMessage::typeInform: {
1270 logging_info("Received relay information message with"
1271 << " relay " << relayMsg->getRelayNode()
1272 << " destination " << relayMsg->getDestNode() );
1273
1274 // mark incoming link as relay
1275 if (ld!=NULL) ld->markAsRelay();
1276
1277 // am I the destination of this message? yes->
1278 if (relayMsg->getDestNode() == nodeId ) {
1279 // deliver relay message locally!
1280 logging_debug("Relay message reached destination. Handling the message.");
1281 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1282 return true;
1283 }
1284
1285 // create route message
1286 OverlayMsg _overMsg( *overlayMsg );
1287 RelayMessage _relayMsg( *relayMsg );
1288 _relayMsg.setType( RelayMessage::typeRoute );
1289 _overMsg.encapsulate( &_relayMsg );
1290
1291 // forward message
1292 if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) {
1293 logging_info("Routing relay message to " << relayMsg->getDestNode().toString() );
1294 overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg );
1295 } else {
1296 logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() );
1297 overlayInterface->routeMessage(relayMsg->getRelayNode(), &_overMsg );
1298 }
1299 return true;
1300 }
1301
1302 // handle relay routing
1303 case RelayMessage::typeRoute: {
1304 logging_info("Received relay route message with"
1305 << " relay " << relayMsg->getRelayNode()
1306 << " destination " << relayMsg->getDestNode() );
1307
1308 // mark incoming link as relay
1309 if (ld!=NULL) ld->markAsRelay();
1310
1311 // am I the destination of this message? yes->
1312 if (relayMsg->getDestNode() == nodeId ) {
1313 // deliver relay message locally!
1314 logging_debug("Relay message reached destination. Handling the message.");
1315 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1316 return true;
1317 }
1318
1319 // am I the relay for this message? yes->
1320 if (relayMsg->getRelayNode() == nodeId ) {
1321 logging_debug("I'm the relay for this message. Sending to destination.");
1322 OverlayMsg _overMsg( *overlayMsg );
1323 RelayMessage _relayMsg( *relayMsg );
1324 _overMsg.encapsulate(&_relayMsg);
1325
1326 /// this must be handled by using relay link!
1327 overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg );
1328 return true;
1329 }
1330
1331 // error: I'm not a relay or destination!
1332 logging_error("This node is neither relay nor destination. Dropping Message!");
1333 return true;
1334 }
1335 default: {
1336 logging_error("RelayMessage Unknown!");
1337 return true;
1338 }
1339 }
1340
1341 break;
1342 }
1343
1344 // ---------------------------------------------------------------------
1345 // handle keep-alive messages
1346 // ---------------------------------------------------------------------
1347 case OverlayMsg::typeKeepAlive: {
1348 if ( ld != NULL ) {
1349 //logging_force("Keep-Alive for "<< ld->overlayId);
1350 ld->markAlive();
1351 }
1352 break;
1353 }
1354
1355 // ---------------------------------------------------------------------
1356 // handle direct link replacement messages
1357 // ---------------------------------------------------------------------
1358 case OverlayMsg::typeDirectLink: {
1359 LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() );
1360 rld->communicationId = ld->communicationId;
1361 rld->communicationUp = true;
1362 rld->relay = false;
1363 rld->localRelay = NodeID::UNSPECIFIED;
1364 rld->remoteRelay = NodeID::UNSPECIFIED;
1365 eraseDescriptor(ld->overlayId);
1366 break;
1367 }
1368
1369 // ---------------------------------------------------------------------
1370 // handle unknown message type
1371 // ---------------------------------------------------------------------
1372 default: {
1373 logging_error( "received message in invalid state! don't know " <<
1374 "what to do with this message of type " <<
1375 overlayMsg->getType() );
1376 return false;
1377 }
1378
1379 } /* switch */
1380 return false;
1381}
1382
1383// ----------------------------------------------------------------------------
1384
1385void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1386
1387 logging_debug( "broadcasting message to all known nodes " <<
1388 "in the overlay from service " + service.toString() );
1389
1390 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes();
1391 OverlayInterface::NodeList::iterator i = nodes.begin();
1392 for(; i != nodes.end(); i++ ) {
1393 if( *i == nodeId) continue; // don't send to ourselfs
1394 sendMessage( message, *i, service );
1395 }
1396}
1397
1398vector<NodeID> BaseOverlay::getOverlayNeighbors() const {
1399 // the known nodes _can_ also include our node, so we remove ourself
1400 vector<NodeID> nodes = overlayInterface->getKnownNodes();
1401 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1402 if( i != nodes.end() ) nodes.erase( i );
1403 return nodes;
1404}
1405
1406const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1407 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1408 const LinkDescriptor* ld = getDescriptor(lid);
1409 if( ld == NULL ) return NodeID::UNSPECIFIED;
1410 else return ld->remoteNode;
1411}
1412
1413vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1414 vector<LinkID> linkvector;
1415 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1416 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1417 linkvector.push_back( ld->overlayId );
1418 }
1419 }
1420 return linkvector;
1421}
1422
1423
1424void BaseOverlay::onNodeJoin(const NodeID& node) {
1425 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1426 if( i == joiningNodes.end() ) return;
1427
1428 logging_info( "node has successfully joined baseoverlay and overlay structure "
1429 << node.toString() );
1430
1431 joiningNodes.erase( i );
1432}
1433
1434void BaseOverlay::eventFunction() {
1435
1436 // send keep-alive messages over established links
1437 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1438 if (!ld->up) continue;
1439 OverlayMsg overMsg( OverlayMsg::typeKeepAlive,
1440 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1441 sendMessage( &overMsg, ld );
1442 }
1443
1444 // iterate over all links and check for time boundaries
1445 vector<LinkDescriptor*> oldlinks;
1446 time_t now = time(NULL);
1447 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1448 // remote used as relay flag
1449 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)
1450 ld->usedAsRelay = false;
1451
1452 // keep alives missed? yes->
1453 if ( !ld->up && difftime( now, ld->keepAliveTime ) > 2 ) {
1454
1455 // increase counter
1456 ld->keepAliveMissed++;
1457
1458 // missed more than four keep-alive messages (4 sec)? -> drop link
1459 if (ld->keepAliveMissed > 10) {
1460 logging_force( "Link connection request is stale, closing: " << ld );
1461 oldlinks.push_back( ld );
1462 }
1463 }
1464
1465 if (!ld->up) continue;
1466
1467 // drop links that are dropped and not used as relay
1468 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink)
1469 oldlinks.push_back( ld );
1470 else
1471
1472 // auto-link time exceeded?
1473 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 )
1474 oldlinks.push_back( ld );
1475
1476 else
1477
1478 // keep alives missed? yes->
1479 if ( !ld->autolink && difftime( now, ld->keepAliveTime ) > 2 ) {
1480
1481 // increase counter
1482 ld->keepAliveMissed++;
1483
1484 // missed more than four keep-alive messages (4 sec)? -> drop link
1485 if (ld->keepAliveMissed >= 8) {
1486 logging_force( "Link is stale, closing: " << ld );
1487 oldlinks.push_back( ld );
1488 }
1489 }
1490 }
1491
1492 // show link state
1493 counter++;
1494 if (counter>=4) showLinkState();
1495 if (counter>=4 || counter<0) counter = 0;
1496
1497 // drop links
1498 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
1499 if (!ld->communicationId.isUnspecified() && ld->communicationId == initiatorLink) {
1500 logging_force( "Not dropping initiator link: " << ld );
1501 continue;
1502 }
1503 logging_force( "Link timed out. Dropping " << ld );
1504 dropLink( ld->overlayId );
1505 }
1506}
1507
1508void BaseOverlay::showLinkState() {
1509 int i=0;
1510 logging_force("--- link state -------------------------------");
1511 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1512 logging_force("link " << i << ": " << ld);
1513 i++;
1514 }
1515 logging_force("----------------------------------------------");
1516}
1517
1518}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.