source: source/ariba/overlay/BaseOverlay.cpp@ 5759

Last change on this file since 5759 was 5759, checked in by mies, 15 years ago
File size: 51.5 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51#include "ariba/overlay/messages/OverlayMsg.h"
52#include "ariba/overlay/messages/JoinRequest.h"
53#include "ariba/overlay/messages/JoinReply.h"
54#include "ariba/overlay/messages/LinkRequest.h"
55#include "ariba/overlay/messages/RelayMessage.h"
56
57#include "ariba/utility/misc/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
63 BOOST_FOREACH( LinkDescriptor* lp, links )
64 if ((communication ? lp->communicationId : lp->overlayId) == link)
65 return lp;
66 return NULL;
67}
68
69const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
70 BOOST_FOREACH( const LinkDescriptor* lp, links )
71 if ((communication ? lp->communicationId : lp->overlayId) == link)
72 return lp;
73 return NULL;
74}
75
76/// erases a link descriptor
77void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
78 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
79 LinkDescriptor* ld = *i;
80 if ((communication ? ld->communicationId : ld->overlayId) == link) {
81 delete ld;
82 links.erase(i);
83 break;
84 }
85 }
86}
87
88/// adds a link descriptor
89LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
90 LinkDescriptor* desc = getDescriptor( link );
91 if ( desc == NULL ) {
92 desc = new LinkDescriptor();
93 desc->overlayId = link;
94 links.push_back(desc);
95 }
96 return desc;
97}
98
99/// returns a auto-link descriptor
100LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
101 BOOST_FOREACH( LinkDescriptor* lp, links )
102 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up)
103 return lp;
104 BOOST_FOREACH( LinkDescriptor* lp, links )
105 if (lp->autolink && lp->remoteNode == node && lp->service == service )
106 return lp;
107 return NULL;
108}
109
110/// returns a direct local link relay descriptor for the given remote node
111LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& id ) {
112
113 // get used next hop towards node
114 LinkDescriptor* rld = NULL;
115 NodeID relayNode = NodeID::UNSPECIFIED;
116 LinkID rlid = overlayInterface->getNextLinkId(id);
117 if ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {
118 // get descriptor of first hop
119 rld = getDescriptor(rlid);
120
121 // is first hop a relay path use local relay
122 if ( rld->relay ) relayNode = rld->localRelay;
123
124 // no-> a proper relay node has been found
125 else relayNode = rld->remoteNode;
126 }
127
128 // if first relay is unknown choose a arbitrary direct node as relay
129 if ( relayNode.isUnspecified() ) {
130 for (size_t i=0; i<links.size(); i++)
131 if (links[i]->up &&
132 links[i]->communicationUp &&
133 !links[i]->relay &&
134 links[i]->keepAliveMissed <= 1 &&
135 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
136 relayNode = links[i]->remoteNode;
137 break;
138 }
139 }
140
141 // no local relay found-> damn!
142 if (relayNode.isUnspecified()) return NULL;
143
144 // get descriptor
145 BOOST_FOREACH( LinkDescriptor* lp, links )
146 if (lp->remoteNode == relayNode &&
147 lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&
148 lp->relay == false &&
149 lp->up) {
150
151// std::cout << "using relay node: " << lp->remoteNode.toString() << std::endl;
152 return lp;
153 }
154
155 return NULL;
156}
157
158/// returns the link descriptor that is actually used for sending a message over the overöay
159LinkDescriptor* BaseOverlay::getSendDescriptor( const NodeID& nodeid, bool follow ) {
160 for (size_t i=0; i<links.size(); i++)
161 if ( !links[i]->relay &&
162 links[i]->up &&
163 links[i]->communicationUp &&
164 links[i]->keepAliveMissed <= 1 &&
165 links[i]->remoteNode == nodeid &&
166 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
167 return links[i];
168 }
169 LinkDescriptor* ld = getDescriptor(overlayInterface->getNextLinkId(nodeid));
170 if (ld != NULL && ld->relay && follow)
171 return getSendDescriptor(ld->localRelay, false);
172 return NULL;
173}
174
175NodeID BaseOverlay::getRelayNode( const NodeID& remoteNode ) {
176 LinkDescriptor* rld = getRelayDescriptor(remoteNode);
177 return rld!=NULL ? rld->remoteNode : NodeID::UNSPECIFIED;
178}
179
180/// routes a message over the overlay or directly sends it when a link is open
181seqnum_t BaseOverlay::sendOverlay( Message* message, const NodeID& nodeid, const NodeID& remoteRelay ) {
182 /// send message directly to a neighbor
183 for (size_t i=0; i<links.size(); i++)
184 if ( !links[i]->relay &&
185 links[i]->up &&
186 links[i]->communicationUp &&
187 links[i]->keepAliveMissed <= 1 &&
188 links[i]->remoteNode == nodeid &&
189 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
190
191 // mark as relay and send message
192 links[i]->markAsRelay();
193 return sendMessage( message, links[i] );
194 }
195
196 /// send relayed message over the overlay
197 if (!remoteRelay.isUnspecified()) {
198 // create a information relay message to inform the relay about
199 OverlayMsg overlay_msg(
200 OverlayMsg::typeRelay, OverlayInterface::OVERLAY_SERVICE_ID, nodeId);
201 RelayMessage relayMsg( RelayMessage::typeInform, remoteRelay, nodeid, LinkID::UNSPECIFIED );
202 relayMsg.encapsulate( message );
203 overlay_msg.encapsulate( &relayMsg );
204
205 // get local relay link
206 LinkDescriptor* rld = getRelayDescriptor(nodeid);
207
208 // local relay available? send to local relay!
209 if (rld!=NULL) {
210 rld->markAsRelay();
211 sendMessage(&overlay_msg, rld);
212 } else
213 overlayInterface->routeMessage(remoteRelay, &overlay_msg);
214
215 // finished
216 return 0;
217 }
218
219 // common case: send message over the overlay
220 overlayInterface->routeMessage(nodeid, message);
221 return 0;
222}
223
224/// forwards a message over relays/directly using link descriptor
225seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) {
226
227 // directly send message
228 if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) {
229 logging_debug("Send: Sending message via Base Communication");
230 return bc->sendMessage( ld->communicationId, message );
231 }
232
233 // relay message
234 else if ( ld->relay ) {
235
236 // sending a relayed message
237 logging_debug("Send: Relaying message to node "
238 << ld->remoteNode.toString()
239 << " using relay " << ld->localRelay
240 );
241
242 // create a information relay message to inform the relay about
243 OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId );
244 RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId );
245 relayMsg.encapsulate( message );
246 overlay_msg.encapsulate( &relayMsg );
247
248 // route message to relay node in order to inform it!
249 logging_debug("sendMessage: Sending message over relayed link with" << ld );
250 sendOverlay( &overlay_msg, ld->localRelay );
251 return 0;
252 }
253
254 // error
255 else {
256 logging_error( "Could not send message descriptor=" << ld );
257 return -1;
258 }
259 return -1;
260}
261
262/// creates a link descriptor, apply relay semantics if possible
263LinkDescriptor* BaseOverlay::createLinkDescriptor(
264 const NodeID remoteNode, const ServiceID service, const LinkID link_id ) {
265
266 // find listener
267 if( !communicationListeners.contains( service ) ) {
268 logging_error( "No listener found for service " << service.toString() );
269 return NULL;
270 }
271 CommunicationListener* listener = communicationListeners.get( service );
272 assert( listener != NULL );
273
274 // copy link id
275 LinkID linkid = link_id;
276
277 // create link id if necessary
278 if ( linkid.isUnspecified() )
279 linkid = LinkID::create();
280
281 // create relay link descriptor
282 NodeID relayNode = getRelayNode(remoteNode);
283
284 // add descriptor
285 LinkDescriptor* ld = addDescriptor( linkid );
286 ld->overlayId = linkid;
287 ld->service = service;
288 ld->listener = listener;
289 ld->remoteNode = remoteNode;
290
291 // set relay node if available
292 ld->relay = !relayNode.isUnspecified();
293 ld->localRelay = relayNode;
294
295 if (!ld->relay)
296 logging_error("No relay found!");
297
298 // debug output
299 logging_debug( "Created link descriptor: " << ld );
300
301 return ld;
302}
303
304
305// ----------------------------------------------------------------------------
306
307use_logging_cpp(BaseOverlay);
308
309// ----------------------------------------------------------------------------
310
311BaseOverlay::BaseOverlay() :
312 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
313 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
314 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
315}
316
317BaseOverlay::~BaseOverlay() {
318}
319
320// ----------------------------------------------------------------------------
321
322void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
323 logging_info("Starting...");
324
325 // set parameters
326 bc = &_basecomm;
327 nodeId = _nodeid;
328
329 // register at base communication
330 bc->registerMessageReceiver( this );
331 bc->registerEventListener( this );
332
333 // timer for auto link management
334 Timer::setInterval( 1000 );
335 Timer::start();
336
337 started = true;
338 state = BaseOverlayStateInvalid;
339}
340
341void BaseOverlay::stop() {
342 logging_info("Stopping...");
343
344 // stop timer
345 Timer::stop();
346
347 // delete oberlay interface
348 if(overlayInterface != NULL) {
349 delete overlayInterface;
350 overlayInterface = NULL;
351 }
352
353 // unregister at base communication
354 bc->unregisterMessageReceiver( this );
355 bc->unregisterEventListener( this );
356
357 started = false;
358 state = BaseOverlayStateInvalid;
359}
360
361bool BaseOverlay::isStarted(){
362 return started;
363}
364
365// ----------------------------------------------------------------------------
366
367void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
368 const EndpointDescriptor& bootstrapEp) {
369
370 if(id != spovnetId){
371 logging_error("attempt to join against invalid spovnet, call initiate first");
372 return;
373 }
374
375
376 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
377 logging_info( "Starting to join spovnet " << id.toString() <<
378 " with nodeid " << nodeId.toString());
379
380 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
381
382 // bootstrap against ourselfs
383 logging_debug("joining spovnet locally");
384
385 overlayInterface->joinOverlay();
386 state = BaseOverlayStateCompleted;
387 BOOST_FOREACH( NodeListener* i, nodeListeners )
388 i->onJoinCompleted( spovnetId );
389
390 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
391 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
392
393 logging_debug("starting overlay bootstrap module");
394 overlayBootstrap.start(this, spovnetId, nodeId);
395 overlayBootstrap.publish(bc->getEndpointDescriptor());
396
397 } else {
398
399 // bootstrap against another node
400 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
401
402 const LinkID& lnk = bc->establishLink( bootstrapEp );
403 bootstrapLinks.push_back(lnk);
404 logging_info("join process initiated for " << id.toString() << "...");
405 }
406}
407
408void BaseOverlay::leaveSpoVNet() {
409
410 logging_info( "Leaving spovnet " << spovnetId );
411 bool ret = ( state != this->BaseOverlayStateInvalid );
412
413 logging_debug("stopping overlay bootstrap module");
414 overlayBootstrap.stop();
415 overlayBootstrap.revoke();
416
417 logging_debug( "Dropping all auto-links" );
418
419 // gather all service links
420 vector<LinkID> servicelinks;
421 BOOST_FOREACH( LinkDescriptor* ld, links ) {
422 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
423 servicelinks.push_back( ld->overlayId );
424 }
425
426 // drop all service links
427 BOOST_FOREACH( LinkID lnk, servicelinks )
428 dropLink( lnk );
429
430 // let the node leave the spovnet overlay interface
431 logging_debug( "Leaving overlay" );
432 if( overlayInterface != NULL )
433 overlayInterface->leaveOverlay();
434
435 // drop still open bootstrap links
436 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
437 bc->dropLink( lnk );
438
439 // change to inalid state
440 state = BaseOverlayStateInvalid;
441 //ovl.visShutdown( ovlId, nodeId, string("") );
442
443 // inform all registered services of the event
444 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
445 if( ret ) i->onLeaveCompleted( spovnetId );
446 else i->onLeaveFailed( spovnetId );
447 }
448}
449
450void BaseOverlay::createSpoVNet(const SpoVNetID& id,
451 const OverlayParameterSet& param,
452 const SecurityParameterSet& sec,
453 const QoSParameterSet& qos) {
454
455 // set the state that we are an initiator, this way incoming messages are
456 // handled correctly
457 logging_info( "creating spovnet " + id.toString() <<
458 " with nodeid " << nodeId.toString() );
459
460 spovnetId = id;
461
462 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
463 if( overlayInterface == NULL ) {
464 logging_fatal( "overlay structure not supported" );
465 state = BaseOverlayStateInvalid;
466
467 BOOST_FOREACH( NodeListener* i, nodeListeners )
468 i->onJoinFailed( spovnetId );
469
470 return;
471 }
472}
473
474// ----------------------------------------------------------------------------
475
476const LinkID BaseOverlay::establishLink(
477 const EndpointDescriptor& ep, const NodeID& nodeid,
478 const ServiceID& service, const NodeID& remoteRelay, const LinkID& linkid ) {
479
480 LinkID link_id = linkid;
481
482 // establish link via overlay
483 if (!nodeid.isUnspecified())
484 link_id = establishLink( nodeid, service, remoteRelay, link_id );
485
486 // establish link directly if only ep is known
487 if (nodeid.isUnspecified())
488 establishDirectLink( ep, service, link_id );
489
490 return link_id;
491}
492
493/// call base communication's establish link and add link mapping
494const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
495 const ServiceID& service, const LinkID& linkid ) {
496
497 // create a new link id if necessary
498 LinkID link_id = linkid;
499 if (link_id.isUnspecified()) link_id = LinkID::create();
500
501 /// find a service listener
502 if( !communicationListeners.contains( service ) ) {
503 logging_error( "No listener registered for service id=" << service.toString() );
504 return LinkID::UNSPECIFIED;
505 }
506 CommunicationListener* listener = communicationListeners.get( service );
507 assert( listener != NULL );
508
509 /// establish link and add mapping
510 logging_info("Establishing direct link " << link_id.toString()
511 << " using " << ep.toString());
512
513 // create descriptor
514 LinkDescriptor* ld = addDescriptor( link_id );
515 ld->overlayId = link_id;
516 ld->communicationId = link_id;
517 ld->listener = listener;
518 ld->service = service;
519 bc->establishLink( ep, link_id );
520
521 return link_id;
522}
523
524/// establishes a link between two arbitrary nodes
525const LinkID BaseOverlay::establishLink( const NodeID& node,
526 const ServiceID& service, const NodeID& remoteRelay, const LinkID& link_id ) {
527
528 // do not establish a link to myself!
529 if (node == nodeId) return LinkID::UNSPECIFIED;
530
531 // create a link descriptor
532 LinkDescriptor* ld = createLinkDescriptor( node, service, link_id );
533 ld->remoteRelay = remoteRelay;
534
535 // create link request message with own link id
536 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
537 LinkRequest link_request_msg(
538 nonce, &bc->getEndpointDescriptor(), false,
539 ld->overlayId, ld->localRelay );
540 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
541 overlay_msg.encapsulate( &link_request_msg );
542 pendingLinks.insert( make_pair(nonce, ld->overlayId) );
543
544 // debug message
545 logging_debug(
546 "Sending link request with"
547 << " link id=" << ld->overlayId
548 << " node id=" << ld->remoteNode.toString()
549 << " service id=" << ld->service.toString()
550 << " local relay id=" << ld->localRelay.toString()
551 << " nonce= " << nonce
552 );
553
554 // sending message through new link
555 sendMessage( &overlay_msg, ld );
556
557 return ld->overlayId;
558}
559
560/// drops an established link
561void BaseOverlay::dropLink(const LinkID& link) {
562 logging_debug( "Dropping link (initiated locally):" << link.toString() );
563
564 // find the link item to drop
565 LinkDescriptor* ld = getDescriptor(link);
566 if( ld == NULL ) {
567 logging_warn( "Can't drop link, link is unknown!");
568 return;
569 }
570
571 // delete all queued messages
572 if( ld->messageQueue.size() > 0 ) {
573 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
574 << ld->messageQueue.size() << " waiting messages" );
575 ld->flushQueue();
576 }
577
578 // inform sideport and listener
579 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
580 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
581
582 // do not drop relay links
583 if (!ld->usedAsRelay) {
584 // drop the link in base communication
585 if (ld->communicationUp) bc->dropLink( ld->communicationId );
586
587 // erase descriptor
588 eraseDescriptor( ld->overlayId );
589 } else
590 ld->dropWhenRelaysLeft = true;
591}
592
593// ----------------------------------------------------------------------------
594
595/// internal send message, always use this functions to send messages over links
596seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
597 logging_debug( "Sending data message on link " << link.toString() );
598
599 // get the mapping for this link
600 LinkDescriptor* ld = getDescriptor(link);
601 if( ld == NULL ) {
602 logging_error("Could not send message. "
603 << "Link not found id=" << link.toString());
604 return -1;
605 }
606
607 // check if the link is up yet, if its an auto link queue message
608 if( !ld->up ) {
609 ld->markAsUsed();
610 if( ld->autolink ) {
611 logging_info("Auto-link " << link.toString() << " not up, queue message");
612 Data data = data_serialize( message );
613 const_cast<Message*>(message)->dropPayload();
614 ld->messageQueue.push_back( new Message(data) );
615 } else {
616 logging_error("Link " << link.toString() << " not up, drop message");
617 }
618 return -1;
619 }
620
621 // compile overlay message (has service and node id)
622 OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId );
623 overmsg.encapsulate( const_cast<Message*>(message) );
624
625 // send message over relay/direct/overlay
626 return sendMessage( &overmsg, ld );
627}
628
629seqnum_t BaseOverlay::sendMessage(const Message* message,
630 const NodeID& node, const ServiceID& service) {
631
632 // find link for node and service
633 LinkDescriptor* ld = getAutoDescriptor( node, service );
634
635 // if we found no link, create an auto link
636 if( ld == NULL ) {
637
638 // debug output
639 logging_info( "No link to send message to node "
640 << node.toString() << " found for service "
641 << service.toString() << ". Creating auto link ..."
642 );
643
644 // this will call onlinkup on us, if everything worked we now have a mapping
645 LinkID link = LinkID::create();
646
647 // call base overlay to create a link
648 link = establishLink( node, service, NodeID::UNSPECIFIED, link );
649 ld = getDescriptor( link );
650 if( ld == NULL ) {
651 logging_error( "Failed to establish auto-link.");
652 return -1;
653 }
654 ld->autolink = true;
655
656 logging_debug( "Auto-link establishment in progress to node "
657 << node.toString() << " with link id=" << link.toString() );
658 }
659 assert(ld != NULL);
660
661 // mark the link as used, as we now send a message through it
662 ld->markAsUsed();
663
664 // send / queue message
665 return sendMessage( message, ld->overlayId );
666}
667
668// ----------------------------------------------------------------------------
669
670const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
671 const LinkID& link) const {
672
673 // return own end-point descriptor
674 if( link == LinkID::UNSPECIFIED )
675 return bc->getEndpointDescriptor();
676
677 // find link descriptor. not found -> return unspecified
678 const LinkDescriptor* ld = getDescriptor(link);
679 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
680
681 // return endpoint-descriptor from base communication
682 return bc->getEndpointDescriptor( ld->communicationId );
683}
684
685const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
686 const NodeID& node) const {
687
688 // return own end-point descriptor
689 if( node == nodeId || node == NodeID::UNSPECIFIED )
690 return bc->getEndpointDescriptor();
691
692 // no joined and request remote descriptor? -> fail!
693 if( overlayInterface == NULL ) {
694 logging_error( "overlay interface not set, cannot resolve endpoint" );
695 return EndpointDescriptor::UNSPECIFIED();
696 }
697
698 // resolve end-point descriptor from the base-overlay routing table
699 return overlayInterface->resolveNode( node );
700}
701
702// ----------------------------------------------------------------------------
703
704bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
705 sideport = _sideport;
706 _sideport->configure( this );
707}
708
709bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
710 sideport = &SideportListener::DEFAULT;
711}
712
713// ----------------------------------------------------------------------------
714
715bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
716 logging_debug( "binding communication listener " << listener
717 << " on serviceid " << sid.toString() );
718
719 if( communicationListeners.contains( sid ) ) {
720 logging_error( "some listener already registered for service id "
721 << sid.toString() );
722 return false;
723 }
724
725 communicationListeners.registerItem( listener, sid );
726 return true;
727}
728
729
730bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
731 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
732
733 if( !communicationListeners.contains( sid ) ) {
734 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
735 return false;
736 }
737
738 if( communicationListeners.get(sid) != listener ) {
739 logging_warn( "listener bound to service id " << sid.toString()
740 << " is different than listener trying to unbind" );
741 return false;
742 }
743
744 communicationListeners.unregisterItem( sid );
745 return true;
746}
747
748// ----------------------------------------------------------------------------
749
750bool BaseOverlay::bind(NodeListener* listener) {
751 logging_debug( "Binding node listener " << listener );
752
753 // already bound? yes-> warning
754 NodeListenerVector::iterator i =
755 find( nodeListeners.begin(), nodeListeners.end(), listener );
756 if( i != nodeListeners.end() ) {
757 logging_warn("Node listener " << listener << " is already bound!" );
758 return false;
759 }
760
761 // no-> add
762 nodeListeners.push_back( listener );
763 return true;
764}
765
766bool BaseOverlay::unbind(NodeListener* listener) {
767 logging_debug( "Unbinding node listener " << listener );
768
769 // already unbound? yes-> warning
770 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
771 if( i == nodeListeners.end() ) {
772 logging_warn( "Node listener " << listener << " is not bound!" );
773 return false;
774 }
775
776 // no-> remove
777 nodeListeners.erase( i );
778 return true;
779}
780
781// ----------------------------------------------------------------------------
782
783void BaseOverlay::onLinkUp(const LinkID& id,
784 const address_v* local, const address_v* remote) {
785 logging_debug( "Link up with base communication link id=" << id );
786
787 // get descriptor for link
788 LinkDescriptor* ld = getDescriptor(id, true);
789
790 // handle bootstrap link we initiated
791 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
792 logging_info(
793 "Join has been initiated by me and the link is now up. " <<
794 "Sending out join request for SpoVNet " << spovnetId.toString()
795 );
796
797 // send join request message
798 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId );
799 JoinRequest joinRequest( spovnetId, nodeId );
800 overlayMsg.encapsulate( &joinRequest );
801 bc->sendMessage( id, &overlayMsg );
802 return;
803 }
804
805 // no link found? -> link establishment from remote, add one!
806 if (ld == NULL) {
807 ld = addDescriptor( id );
808 logging_debug( "onLinkUp (remote request) descriptor: " << ld );
809
810 // update descriptor
811 ld->fromRemote = true;
812 ld->communicationId = id;
813 ld->communicationUp = true;
814 ld->markAsUsed();
815
816 // in this case, do not inform listener, since service it unknown
817 // -> wait for update message!
818
819 // link mapping found? -> send update message with node-id and service id
820 } else {
821 logging_debug( "onLinkUp descriptor (initiated locally):" << ld );
822
823 // note: necessary to validate the link on the remote side!
824 logging_debug( "Sending out update" <<
825 " for service " << ld->service.toString() <<
826 " with local node id " << nodeId.toString() <<
827 " on link " << ld->overlayId.toString() );
828
829 // update descriptor
830 ld->markAsUsed();
831 ld->communicationUp = true;
832
833 // if link is a relayed link ->convert to direct link
834 if (ld->relay && !ld->remoteLinkId.isUnspecified() ) {
835 logging_info( "Converting to direct link: " << ld );
836 ld->up = true;
837 ld->relay = false;
838 ld->localRelay = NodeID::UNSPECIFIED;
839 OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId );
840 overMsg.setRelayLink( ld->remoteLinkId );
841 bc->sendMessage( ld->communicationId, &overMsg );
842 }
843
844 // compile and send update message
845 OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
846 overlayMsg.setAutoLink( ld->autolink );
847 bc->sendMessage( ld->communicationId, &overlayMsg );
848 }
849}
850
851void BaseOverlay::onLinkDown(const LinkID& id,
852 const address_v* local, const address_v* remote) {
853
854 // erase bootstrap links
855 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
856 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
857
858 // get descriptor for link
859 LinkDescriptor* ld = getDescriptor(id, true);
860 if ( ld == NULL ) return; // not found? ->ignore!
861 logging_info( "onLinkDown descriptor: " << ld );
862
863 // inform listeners about link down
864 ld->communicationUp = false;
865 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
866 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
867
868 // delete all queued messages (auto links)
869 if( ld->messageQueue.size() > 0 ) {
870 logging_warn( "Dropping link " << id.toString() << " that has "
871 << ld->messageQueue.size() << " waiting messages" );
872 ld->flushQueue();
873 }
874
875 // erase mapping
876 eraseDescriptor(ld->overlayId);
877}
878
879void BaseOverlay::onLinkChanged(const LinkID& id,
880 const address_v* oldlocal, const address_v* newlocal,
881 const address_v* oldremote, const address_v* newremote) {
882
883 // get descriptor for link
884 LinkDescriptor* ld = getDescriptor(id, true);
885 if ( ld == NULL ) return; // not found? ->ignore!
886 logging_debug( "onLinkChanged descriptor: " << ld );
887
888 // inform listeners
889 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
890 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
891
892 // autolinks: refresh timestamp
893 ld->markAsUsed();
894}
895
896void BaseOverlay::onLinkFail(const LinkID& id,
897 const address_v* local, const address_v* remote) {
898 logging_debug( "Link fail with base communication link id=" << id );
899
900 // erase bootstrap links
901 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
902 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
903
904 // get descriptor for link
905 LinkDescriptor* ld = getDescriptor(id, true);
906 if ( ld == NULL ) return; // not found? ->ignore!
907 logging_debug( "Link failed id=" << ld->overlayId.toString() );
908
909 // inform listeners
910 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
911 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
912
913 // autolinks: refresh timestamp
914 ld->markAsUsed();
915}
916
917void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
918 const address_v* remote, const QoSParameterSet& qos) {
919 logging_debug( "Link quality changed with base communication link id=" << id );
920
921 // get descriptor for link
922 LinkDescriptor* ld = getDescriptor(id, true);
923 if ( ld == NULL ) return; // not found? ->ignore!
924 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
925
926 // autolinks: refresh timestamp
927 ld->markAsUsed();
928}
929
930bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
931 const address_v* remote ) {
932 logging_debug("Accepting link request from " << remote->to_string() );
933 return true;
934}
935
936/// handles a message from base communication
937bool BaseOverlay::receiveMessage(const Message* message,
938 const LinkID& link, const NodeID& ) {
939 // get descriptor for link
940 LinkDescriptor* ld = getDescriptor( link, true );
941
942 // link known?
943 if (ld == NULL) { // no-> handle with unspecified params
944 logging_debug("Received message from base communication, link descriptor unknown" );
945 return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED );
946 } else { // yes -> handle with overlay link id
947 logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() );
948 return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED );
949 }
950}
951
952// ----------------------------------------------------------------------------
953
954/// handles a message from an overlay
955void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) {
956 logging_debug("Received message from overlay -- "
957 << " link id=" << link.toString()
958 << " node id=" << source.toString() );
959 handleMessage( msg, link, LinkID::UNSPECIFIED, source );
960}
961
962// ----------------------------------------------------------------------------
963
964/// handles an incoming message
965bool BaseOverlay::handleMessage( const Message* message,
966 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) {
967 logging_debug( "Handling message: " << message->toString());
968
969 bool ret = false;
970
971 // decapsulate overlay message
972 OverlayMsg* overlayMsg =
973 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
974 if( overlayMsg == NULL ) return false;
975
976 // mark the link as in action
977 LinkDescriptor* ld = getDescriptor(boLink);
978 if (ld == NULL) ld = getDescriptor(bcLink, true);
979 if (ld != NULL) {
980 ld->markAsUsed();
981 ld->markAlive();
982 }
983
984 switch ( overlayMsg->getType() ) {
985 // ---------------------------------------------------------------------
986 // Handle spovnet instance join requests
987 // ---------------------------------------------------------------------
988 case OverlayMsg::typeJoinRequest: {
989
990 // decapsulate message
991 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
992 logging_info( "Received join request for spovnet " <<
993 joinReq->getSpoVNetID().toString() );
994
995 // check spovnet id
996 if( joinReq->getSpoVNetID() != spovnetId ) {
997 logging_error(
998 "Received join request for spovnet we don't handle " <<
999 joinReq->getSpoVNetID().toString() );
1000 ret = false;
1001 break;
1002 }
1003
1004 // TODO: here you can implement mechanisms to deny joining of a node
1005 bool allow = true;
1006 logging_info( "Sending join reply for spovnet " <<
1007 spovnetId.toString() << " to node " <<
1008 overlayMsg->getSourceNode().toString() <<
1009 ". Result: " << (allow ? "allowed" : "denied") );
1010 joiningNodes.push_back( overlayMsg->getSourceNode() );
1011
1012 // return overlay parameters
1013 assert( overlayInterface != NULL );
1014 logging_debug( "Using bootstrap end-point "
1015 << getEndpointDescriptor().toString() )
1016 OverlayParameterSet parameters = overlayInterface->getParameters();
1017 OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId );
1018 JoinReply replyMsg( spovnetId, parameters,
1019 allow, getEndpointDescriptor() );
1020 retmsg.encapsulate(&replyMsg);
1021 bc->sendMessage( bcLink, &retmsg );
1022 ret = true;
1023 break;
1024 }
1025
1026 // ---------------------------------------------------------------------
1027 // handle replies to spovnet instance join requests
1028 // ---------------------------------------------------------------------
1029 case OverlayMsg::typeJoinReply: {
1030
1031 // decapsulate message
1032 logging_debug("received join reply message");
1033 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1034
1035 // correct spovnet?
1036 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1037 logging_error( "Received SpoVNet join reply for " <<
1038 replyMsg->getSpoVNetID().toString() <<
1039 " != " << spovnetId.toString() );
1040 ret = false;
1041 delete replyMsg;
1042 break;
1043 }
1044
1045 // access granted? no -> fail
1046 if( !replyMsg->getJoinAllowed() ) {
1047 logging_error( "Our join request has been denied" );
1048
1049 // drop initiator link
1050 if(bcLink != LinkID::UNSPECIFIED){
1051 bc->dropLink( bcLink );
1052
1053 vector<LinkID>::iterator it = std::find(
1054 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1055 if( it != bootstrapLinks.end() )
1056 bootstrapLinks.erase(it);
1057 }
1058
1059 // inform all registered services of the event
1060 BOOST_FOREACH( NodeListener* i, nodeListeners )
1061 i->onJoinFailed( spovnetId );
1062
1063 ret = true;
1064 delete replyMsg;
1065 break;
1066 }
1067
1068 // access has been granted -> continue!
1069 logging_info("Join request has been accepted for spovnet " <<
1070 spovnetId.toString() );
1071
1072 logging_debug( "Using bootstrap end-point "
1073 << replyMsg->getBootstrapEndpoint().toString() );
1074
1075 //
1076 // create overlay structure from spovnet parameter set
1077 // if we have not boostrapped yet against some other node
1078 //
1079
1080 if( overlayInterface == NULL ){
1081
1082 logging_debug("first-time bootstrapping");
1083
1084 overlayInterface = OverlayFactory::create(
1085 *this, replyMsg->getParam(), nodeId, this );
1086
1087 // overlay structure supported? no-> fail!
1088 if( overlayInterface == NULL ) {
1089 logging_error( "overlay structure not supported" );
1090
1091 if(bcLink != LinkID::UNSPECIFIED){
1092 bc->dropLink( bcLink );
1093
1094 vector<LinkID>::iterator it = std::find(
1095 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1096 if( it != bootstrapLinks.end() )
1097 bootstrapLinks.erase(it);
1098 }
1099
1100 // inform all registered services of the event
1101 BOOST_FOREACH( NodeListener* i, nodeListeners )
1102 i->onJoinFailed( spovnetId );
1103
1104 delete replyMsg;
1105 ret = true;
1106 break;
1107 }
1108
1109 // everything ok-> join the overlay!
1110 state = BaseOverlayStateCompleted;
1111 overlayInterface->createOverlay();
1112
1113 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1114
1115 // update ovlvis
1116 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1117
1118 // inform all registered services of the event
1119 BOOST_FOREACH( NodeListener* i, nodeListeners )
1120 i->onJoinCompleted( spovnetId );
1121
1122 delete replyMsg;
1123
1124 } else {
1125
1126 // this is not the first bootstrap, just join the additional node
1127 logging_debug("not first-time bootstrapping");
1128 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1129
1130 delete replyMsg;
1131
1132 } // if( overlayInterface == NULL )
1133
1134 ret = true;
1135 break;
1136 }
1137
1138 // ---------------------------------------------------------------------
1139 // handle data forward messages
1140 // ---------------------------------------------------------------------
1141 case OverlayMsg::typeData: {
1142
1143 // get service
1144 const ServiceID& service = overlayMsg->getService();
1145 logging_debug( "received data for service " << service.toString() );
1146
1147 // find listener
1148 CommunicationListener* listener =
1149 communicationListeners.get( service );
1150 if( listener == NULL ) {
1151 ret = true;
1152 break;
1153 }
1154
1155 // delegate data message
1156 listener->onMessage( overlayMsg,
1157 overlayMsg->getSourceNode(), ld->overlayId );
1158
1159 ret = true;
1160 break;
1161 }
1162
1163 // ---------------------------------------------------------------------
1164 // handle update messages for link establishment
1165 // ---------------------------------------------------------------------
1166 case OverlayMsg::typeUpdate: {
1167 // get info
1168 const NodeID& sourcenode = overlayMsg->getSourceNode();
1169 const ServiceID& service = overlayMsg->getService();
1170
1171 // no link descriptor available -> error!
1172 if( ld == NULL ) {
1173 logging_warn( "received overlay update message for link for "
1174 << "which we have no mapping" );
1175 ret = false;
1176 break;
1177 }
1178 logging_debug("Received type update message on link " << ld );
1179
1180 // update our link mapping information for this link
1181 bool changed =
1182 ( ld->remoteNode != sourcenode ) || ( ld->service != service );
1183 ld->remoteNode = sourcenode;
1184 ld->service = service;
1185 ld->autolink = overlayMsg->isAutoLink();
1186
1187 // if our link information changed, we send out an update, too
1188 if( changed ) {
1189 OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
1190 overMsg.setAutoLink(ld->autolink);
1191 bc->sendMessage( ld->communicationId, &overMsg );
1192 }
1193
1194 // service registered? no-> error!
1195 if( !communicationListeners.contains( service ) ) {
1196 logging_warn( "Link up: event listener has not been registered" );
1197 ret = false;
1198 break;
1199 }
1200
1201 // default or no service registered?
1202 CommunicationListener* listener = communicationListeners.get( service );
1203 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1204 logging_warn("Link up: event listener is default or null!" );
1205 ret = true;
1206 break;
1207 }
1208
1209 // update descriptor
1210 ld->listener = listener;
1211 ld->markAsUsed();
1212 ld->markAlive();
1213
1214 // ask the service whether it wants to accept this link
1215 if( !listener->onLinkRequest(sourcenode) ) {
1216
1217 logging_debug("Link id=" << ld->overlayId.toString() <<
1218 " has been denied by service " << service.toString() << ", dropping link");
1219
1220 // prevent onLinkDown calls to the service
1221 ld->listener = &CommunicationListener::DEFAULT;
1222
1223 // drop the link
1224 dropLink( ld->overlayId );
1225 ret = true;
1226 break;
1227 }
1228
1229 // set link up
1230 ld->up = true;
1231 logging_debug(
1232 "Link " << ld->overlayId.toString()
1233 << " has been accepted by service " << service.toString()
1234 << " and is now up"
1235 );
1236
1237 // auto links: link has been accepted -> send queued messages
1238 if( ld->messageQueue.size() > 0 ) {
1239 logging_info( "sending out queued messages on link " <<
1240 ld->overlayId.toString() );
1241 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1242 sendMessage( msg, ld->overlayId );
1243 delete msg;
1244 }
1245 ld->messageQueue.clear();
1246 }
1247
1248 // call the notification functions
1249 listener->onLinkUp( ld->overlayId, sourcenode );
1250 sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId );
1251
1252 ret = true;
1253 break;
1254 }
1255
1256 // ---------------------------------------------------------------------
1257 // handle link request forwarded through the overlay
1258 // ---------------------------------------------------------------------
1259 case OverlayMsg::typeLinkRequest: {
1260
1261 logging_debug( "received link request on link" );
1262
1263 // decapsulate message
1264 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
1265 const ServiceID& service = overlayMsg->getService();
1266
1267 // is request reply?
1268 if ( linkReq->isReply() ) {
1269
1270 // find link
1271 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
1272 if ( i == pendingLinks.end() ) {
1273 logging_error( "Nonce not found in link request" );
1274 ret = true;
1275 break;
1276 }
1277
1278 // debug message
1279 logging_debug( "Link request reply received. Establishing link "
1280 << i->second << " to " << (linkReq->getEndpoint()->toString())
1281 << " for service " << service.toString()
1282 << " with nonce " << linkReq->getNonce()
1283 << " using relay " << linkReq->getRelay().toString()
1284 << " and remote link id=" << linkReq->getRemoteLinkId()
1285 );
1286
1287 // get descriptor
1288 LinkDescriptor* ldn = getDescriptor(i->second);
1289 if (ldn==NULL) {
1290 delete linkReq;
1291 ret = true;
1292 break;
1293 }
1294
1295 // check if link request reply has a relay node ...
1296 if (!linkReq->getRelay().isUnspecified()) { // yes->
1297 ldn->up = true;
1298 ldn->relay = true;
1299 if (ldn->localRelay.isUnspecified()) {
1300 logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn );
1301 showLinkState();
1302 }
1303 ldn->remoteRelay = linkReq->getRelay();
1304 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1305 ldn->remoteNode = overlayMsg->getSourceNode();
1306
1307 ldn->markAlive();
1308
1309 // compile and send update message
1310 OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId );
1311 _overlayMsg.setAutoLink(ldn->autolink);
1312 sendMessage( &_overlayMsg, ldn );
1313
1314 // auto links: link has been accepted -> send queued messages
1315 if( ldn->messageQueue.size() > 0 ) {
1316 logging_info( "Sending out queued messages on link " <<
1317 ldn->overlayId.toString() );
1318 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1319 sendMessage( msg, ldn->overlayId );
1320 delete msg;
1321 }
1322 ldn->messageQueue.clear();
1323 }
1324
1325 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1326
1327 // try to establish a direct link
1328 ldn->communicationId =
1329 bc->establishLink( *linkReq->getEndpoint(), i->second );
1330 }
1331
1332 // no relay node-> use overlay routing
1333 else {
1334 ldn->up = true;
1335
1336 // establish direct link
1337 ldn->communicationId =
1338 bc->establishLink( *linkReq->getEndpoint(), i->second );
1339 }
1340 } else {
1341 logging_debug( "Link request received from node id="
1342 << overlayMsg->getSourceNode() );
1343
1344 // create link descriptor
1345 LinkDescriptor* ldn =
1346 createLinkDescriptor(overlayMsg->getSourceNode(),
1347 overlayMsg->getService(), LinkID::UNSPECIFIED );
1348 assert(!ldn->overlayId.isUnspecified());
1349
1350 // create reply message
1351 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
1352 LinkRequest link_request_msg(
1353 linkReq->getNonce(),
1354 &bc->getEndpointDescriptor(),
1355 true, ldn->overlayId, ldn->localRelay
1356 );
1357 overlay_msg.encapsulate( &link_request_msg );
1358
1359 // debug message
1360 logging_debug( "Sending LinkRequest reply for link with nonce " <<
1361 linkReq->getNonce() );
1362
1363 // if this is a relay link-> update information & inform listeners
1364 if (!linkReq->getRelay().isUnspecified()) {
1365 // set flags
1366 ldn->up = true;
1367 ldn->relay = true;
1368 if (ldn->localRelay.isUnspecified()) {
1369 logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn );
1370 showLinkState();
1371 }
1372 ldn->remoteRelay = linkReq->getRelay();
1373 ldn->remoteNode = overlayMsg->getSourceNode();
1374 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1375 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1376 }
1377
1378 // route message back over overlay
1379 sendMessage( &overlay_msg, ldn );
1380 }
1381 delete linkReq;
1382 ret = true;
1383 break;
1384 }
1385
1386 // ---------------------------------------------------------------------
1387 // handle relay message to forward messages
1388 // ---------------------------------------------------------------------
1389 case OverlayMsg::typeRelay: {
1390
1391 logging_debug( "received relay request on link" );
1392
1393 // decapsulate message
1394 RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>();
1395
1396 // is relay message informative?
1397 switch (relayMsg->getType()) {
1398
1399 // handle relay notification
1400 case RelayMessage::typeInform: {
1401 logging_info("Received relay information message with"
1402 << " relay " << relayMsg->getRelayNode()
1403 << " destination " << relayMsg->getDestNode() );
1404
1405 // mark incoming link as relay
1406 if (ld!=NULL) ld->markAsRelay();
1407
1408 // am I the destination of this message? yes->
1409 if (relayMsg->getDestNode() == nodeId ) {
1410 // deliver relay message locally!
1411 logging_debug("Relay message reached destination. Handling the message.");
1412 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1413 ret = true;
1414 break;
1415 }
1416
1417 // create route message
1418 OverlayMsg _overMsg( *overlayMsg );
1419 relayMsg.setType( RelayMessage::typeRoute );
1420 _overMsg.encapsulate( relayMsg );
1421
1422 // forward message
1423 if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) {
1424 logging_info("Routing relay message to " << relayMsg->getDestNode().toString() );
1425 sendOverlay( &_overMsg, relayMsg->getDestNode() );
1426 } else {
1427 logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() );
1428 sendOverlay( &_overMsg, relayMsg->getRelayNode() );
1429 }
1430 ret = true;
1431 break;
1432 }
1433
1434 // handle relay routing
1435 case RelayMessage::typeRoute: {
1436 logging_info("Received relay route message with"
1437 << " relay " << relayMsg->getRelayNode()
1438 << " destination " << relayMsg->getDestNode() );
1439
1440 // mark incoming link as relay
1441 if (ld!=NULL) ld->markAsRelay();
1442
1443 // am I the destination of this message? yes->
1444 if (relayMsg->getDestNode() == nodeId ) {
1445 // deliver relay message locally!
1446 logging_debug("Relay message reached destination. Handling the message.");
1447 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1448 ret = true;
1449 break;
1450 }
1451
1452 // am I the relay for this message? yes->
1453 if (relayMsg->getRelayNode() == nodeId ) {
1454 logging_debug("I'm the relay for this message. Sending to destination.");
1455 OverlayMsg _overMsg( *overlayMsg );
1456 _overMsg.encapsulate(&relayMsg);
1457
1458 /// this must be handled by using relay link!
1459 sendOverlay(&_overMsg, relayMsg->getDestNode());
1460 ret = true;
1461 break;
1462 }
1463
1464 // error: I'm not a relay or destination!
1465 logging_error("This node is neither relay nor destination. Dropping Message!");
1466 ret = true;
1467 break;
1468 }
1469 default: {
1470 logging_error("RelayMessage Unknown!");
1471 ret = true;
1472 break;
1473 }
1474 }
1475 delete relayMsg;
1476 break;
1477 }
1478
1479 // ---------------------------------------------------------------------
1480 // handle keep-alive messages
1481 // ---------------------------------------------------------------------
1482 case OverlayMsg::typeKeepAlive: {
1483 logging_debug( "received keep-alive on link" );
1484 if ( ld != NULL ) {
1485 logging_info("Keep-Alive for "<< ld->overlayId);
1486 ld->markAlive();
1487 }
1488 break;
1489 }
1490
1491 // ---------------------------------------------------------------------
1492 // handle direct link replacement messages
1493 // ---------------------------------------------------------------------
1494 case OverlayMsg::typeDirectLink: {
1495
1496 logging_debug( "Received direct link replacement request" );
1497
1498 LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() );
1499 if (rld == NULL || ld == NULL) {
1500 logging_error("Direct link replacement: Link "
1501 << overlayMsg->getRelayLink() << "not found error." );
1502 break;
1503 }
1504 logging_info( "Received direct link convert notification for " << rld );
1505
1506 // set communcation link id and set it up
1507 rld->communicationId = ld->communicationId;
1508
1509 // this is neccessary since this link was a relay link before!
1510 rld->communicationUp = true;
1511
1512 // this is not a relay link anymore!
1513 rld->relay = false;
1514 rld->localRelay = NodeID::UNSPECIFIED;
1515 rld->remoteRelay = NodeID::UNSPECIFIED;
1516
1517 // mark used and alive!
1518 rld->markAsUsed();
1519 rld->markAlive();
1520
1521 // erase the original descriptor
1522 eraseDescriptor(ld->overlayId);
1523 break;
1524 }
1525
1526 // ---------------------------------------------------------------------
1527 // handle unknown message type
1528 // ---------------------------------------------------------------------
1529 default: {
1530 logging_error( "received message in invalid state! don't know " <<
1531 "what to do with this message of type " <<
1532 overlayMsg->getType() );
1533 ret = false;
1534 break;
1535 }
1536 } /* switch */
1537
1538 delete overlayMsg;
1539 return ret;
1540}
1541
1542// ----------------------------------------------------------------------------
1543
1544void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1545
1546 logging_debug( "broadcasting message to all known nodes " <<
1547 "in the overlay from service " + service.toString() );
1548
1549 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1550 OverlayInterface::NodeList::iterator i = nodes.begin();
1551 for(; i != nodes.end(); i++ ) {
1552 if( *i == nodeId) continue; // don't send to ourselfs
1553 sendMessage( message, *i, service );
1554 }
1555}
1556
1557vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1558 // the known nodes _can_ also include our node, so we remove ourself
1559 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1560
1561 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1562 if( i != nodes.end() ) nodes.erase( i );
1563
1564 return nodes;
1565}
1566
1567const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1568 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1569 const LinkDescriptor* ld = getDescriptor(lid);
1570 if( ld == NULL ) return NodeID::UNSPECIFIED;
1571 else return ld->remoteNode;
1572}
1573
1574vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1575 vector<LinkID> linkvector;
1576 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1577 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1578 linkvector.push_back( ld->overlayId );
1579 }
1580 }
1581 return linkvector;
1582}
1583
1584
1585void BaseOverlay::onNodeJoin(const NodeID& node) {
1586 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1587 if( i == joiningNodes.end() ) return;
1588
1589 logging_info( "node has successfully joined baseoverlay and overlay structure "
1590 << node.toString() );
1591
1592 joiningNodes.erase( i );
1593}
1594
1595void BaseOverlay::eventFunction() {
1596
1597 // send keep-alive messages over established links
1598 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1599 if (!ld->up) continue;
1600 OverlayMsg overMsg( OverlayMsg::typeKeepAlive,
1601 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1602 sendMessage( &overMsg, ld );
1603 }
1604
1605 // iterate over all links and check for time boundaries
1606 vector<LinkDescriptor*> oldlinks;
1607 time_t now = time(NULL);
1608 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1609 // remote used as relay flag
1610 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)
1611 ld->usedAsRelay = false;
1612
1613 // keep alives and not up? yes-> link connection request is stale!
1614 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
1615
1616 // increase counter
1617 ld->keepAliveMissed++;
1618
1619 // missed more than four keep-alive messages (10 sec)? -> drop link
1620 if (ld->keepAliveMissed > 4) {
1621 logging_info( "Link connection request is stale, closing: " << ld );
1622 oldlinks.push_back( ld );
1623 continue;
1624 }
1625 }
1626
1627 if (!ld->up) continue;
1628
1629 // drop links that are dropped and not used as relay
1630 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink) {
1631 oldlinks.push_back( ld );
1632 continue;
1633 }
1634
1635 // auto-link time exceeded?
1636 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
1637 oldlinks.push_back( ld );
1638 continue;
1639 }
1640
1641 // keep alives missed? yes->
1642 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
1643
1644 // increase counter
1645 ld->keepAliveMissed++;
1646
1647 // missed more than four keep-alive messages (4 sec)? -> drop link
1648 if (ld->keepAliveMissed >= 8) {
1649 logging_info( "Link is stale, closing: " << ld );
1650 oldlinks.push_back( ld );
1651 continue;
1652 }
1653 }
1654 }
1655
1656 // drop links
1657 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
1658/*
1659 vector<LinkID>::iterator it = std::find(
1660 bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId);
1661
1662 if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){
1663 logging_info( "Not dropping initiator link: " << ld );
1664 continue;
1665 }
1666*/
1667 logging_info( "Link timed out. Dropping " << ld );
1668 dropLink( ld->overlayId );
1669 }
1670
1671 // show link state
1672 counter++;
1673 if (counter>=4) showLinkState();
1674 if (counter>=4 || counter<0) counter = 0;
1675}
1676
1677void BaseOverlay::showLinkState() {
1678 int i=0;
1679 logging_info("--- link state -------------------------------");
1680 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1681 logging_info("link " << i << ": " << ld);
1682 i++;
1683 }
1684 logging_info("----------------------------------------------");
1685}
1686
1687}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.