close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/overlay/BaseOverlay.cpp@ 5665

Last change on this file since 5665 was 5638, checked in by Christoph Mayer, 15 years ago

adress detection aufgeräumt, network info für bleutooth, data stream (hopeful crash fix), logging auf maemo nur warn, ...

File size: 50.2 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51#include "ariba/overlay/messages/OverlayMsg.h"
52#include "ariba/overlay/messages/JoinRequest.h"
53#include "ariba/overlay/messages/JoinReply.h"
54#include "ariba/overlay/messages/LinkRequest.h"
55#include "ariba/overlay/messages/RelayMessage.h"
56
57#include "ariba/utility/misc/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
63 BOOST_FOREACH( LinkDescriptor* lp, links )
64 if ((communication ? lp->communicationId : lp->overlayId) == link)
65 return lp;
66 return NULL;
67}
68
69const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
70 BOOST_FOREACH( const LinkDescriptor* lp, links )
71 if ((communication ? lp->communicationId : lp->overlayId) == link)
72 return lp;
73 return NULL;
74}
75
76LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
77 BOOST_FOREACH( LinkDescriptor* lp, links )
78 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up)
79 return lp;
80 BOOST_FOREACH( LinkDescriptor* lp, links )
81 if (lp->autolink && lp->remoteNode == node && lp->service == service )
82 return lp;
83 return NULL;
84}
85
86void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
87 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
88 LinkDescriptor* ld = *i;
89 if ((communication ? ld->communicationId : ld->overlayId) == link) {
90 delete ld;
91 links.erase(i);
92 break;
93 }
94 }
95}
96
97LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
98 LinkDescriptor* desc = getDescriptor( link );
99 if ( desc == NULL ) {
100 desc = new LinkDescriptor();
101 desc->overlayId = link;
102 links.push_back(desc);
103 }
104 return desc;
105}
106
107/// returns a direct link relay descriptor to the given relay node
108LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& relayNode ) {
109 BOOST_FOREACH( LinkDescriptor* lp, links )
110 if (lp->remoteNode == relayNode &&
111 lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&
112 lp->relay == false &&
113 lp->up)
114 return lp;
115 return NULL;
116}
117
118/// find a proper relay node
119const NodeID BaseOverlay::findRelayNode( const NodeID id ) {
120 LinkDescriptor* rld = NULL;
121 NodeID relayNode = NodeID::UNSPECIFIED;
122
123 // get used next hop towards node
124 LinkID rlid = overlayInterface->getNextLinkId(id);
125 while ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {
126
127 // get descriptor of first hop
128 rld = getDescriptor(rlid);
129 logging_info( rld );
130
131 // is first hop a relay path? yes-> try to find real link!
132 if ( rld->relay )
133 relayNode = getRelayDescriptor(rld->localRelay)->remoteNode;
134
135 // no-> a proper relay node has been found
136 else relayNode = rld->remoteNode;
137 }
138
139 // if first relay is unknown choose a arbitrary direct node as relay
140 if ( relayNode.isUnspecified() ) {
141 for (size_t i=0; i<links.size(); i++)
142 if (links[i]->up &&
143 links[i]->communicationUp &&
144 !links[i]->relay &&
145 links[i]->keepAliveMissed <= 1 &&
146 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
147 relayNode = links[i]->remoteNode;
148 break;
149 }
150 }
151
152 // do not return myself or use the node as relay node
153 if (relayNode == nodeId)
154 return NodeID::UNSPECIFIED;
155 else {
156 logging_info( "Returning relay node " << relayNode.toString() );
157 return relayNode;
158 }
159}
160
161/// forwards a message over relays/directly using link descriptor
162seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) {
163
164 // directly send message
165 if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) {
166 logging_debug("Send: Sending message via Base Communication");
167 return bc->sendMessage( ld->communicationId, message );
168 }
169
170 // relay message
171 else if ( ld->relay ) {
172
173 logging_debug("Send: Relaying message to node "
174 << ld->remoteNode.toString()
175 << " using relay " << ld->localRelay
176 );
177/*
178 // get local relay link descriptor and mark as used for relaying
179 LinkDescriptor* rld = getRelayDescriptor(ld->localRelay);
180 if (rld==NULL) {
181 logging_error("Send: Relay descriptor for relay " <<
182 ld->localRelay.toString() << " is unknown.");
183 return -1;
184 }
185 rld->markAsRelay();*/
186
187 // create a information relay message to inform the relay about
188 OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId );
189 RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId );
190 relayMsg.encapsulate( message );
191 overlay_msg.encapsulate( &relayMsg );
192
193 // route message to relay node in order to inform it!
194 logging_debug("sendMessage: Sending message over relayed link with" << ld );
195 sendOverlay( &overlay_msg, ld->localRelay );
196 return 0;
197 }
198
199 // error
200 else {
201 logging_error( "Could not send message descriptor=" << ld );
202 return -1;
203 }
204 return -1;
205}
206
207LinkDescriptor* BaseOverlay::getSendDescriptor( const NodeID& nodeid, bool follow ) {
208 for (size_t i=0; i<links.size(); i++)
209 if ( !links[i]->relay &&
210 links[i]->up &&
211 links[i]->communicationUp &&
212 links[i]->keepAliveMissed <= 1 &&
213 links[i]->remoteNode == nodeid &&
214 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
215 return links[i];
216 }
217 LinkDescriptor* ld = getDescriptor(overlayInterface->getNextLinkId(nodeid));
218 if (ld != NULL && ld->relay && follow)
219 return getSendDescriptor(ld->localRelay, false);
220 return NULL;
221}
222
223/// routes a message over the overlay or directly sends it when a link is open
224seqnum_t BaseOverlay::sendOverlay( Message* message, const NodeID& nodeid ) {
225 for (size_t i=0; i<links.size(); i++)
226 if ( !links[i]->relay &&
227 links[i]->up &&
228 links[i]->communicationUp &&
229 links[i]->keepAliveMissed <= 1 &&
230 links[i]->remoteNode == nodeid &&
231 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
232 links[i]->markAsRelay();
233 return sendMessage( message, links[i] );
234 break;
235 }
236 overlayInterface->routeMessage(nodeid, message);
237 return 0;
238}
239
240/// creates a link descriptor, apply relay semantics if possible
241LinkDescriptor* BaseOverlay::createLinkDescriptor(
242 const NodeID remoteNode, const ServiceID service, const LinkID link_id ) {
243
244 // find listener
245 if( !communicationListeners.contains( service ) ) {
246 logging_error( "No listener found for service " << service.toString() );
247 return NULL;
248 }
249 CommunicationListener* listener = communicationListeners.get( service );
250 assert( listener != NULL );
251
252 // copy link id
253 LinkID linkid = link_id;
254
255 // create link id if necessary
256 if ( linkid.isUnspecified() )
257 linkid = LinkID::create();
258
259 // create relay link descriptor
260 NodeID relayNode = findRelayNode(remoteNode);
261
262 // add descriptor
263 LinkDescriptor* ld = addDescriptor( linkid );
264 ld->overlayId = linkid;
265 ld->service = service;
266 ld->listener = listener;
267 ld->remoteNode = remoteNode;
268
269 // set relay node if available
270 ld->relay = !relayNode.isUnspecified();
271 ld->localRelay = relayNode;
272
273 if (!ld->relay)
274 logging_error("No relay found!");
275
276 // debug output
277 logging_debug( "Created link descriptor: " << ld );
278
279 return ld;
280}
281
282
283// ----------------------------------------------------------------------------
284
285use_logging_cpp(BaseOverlay);
286
287// ----------------------------------------------------------------------------
288
289BaseOverlay::BaseOverlay() :
290 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
291 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
292 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
293}
294
295BaseOverlay::~BaseOverlay() {
296}
297
298// ----------------------------------------------------------------------------
299
300void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
301 logging_info("Starting...");
302
303 // set parameters
304 bc = &_basecomm;
305 nodeId = _nodeid;
306
307 // register at base communication
308 bc->registerMessageReceiver( this );
309 bc->registerEventListener( this );
310
311 // timer for auto link management
312 Timer::setInterval( 500 );
313 Timer::start();
314
315 started = true;
316 state = BaseOverlayStateInvalid;
317}
318
319void BaseOverlay::stop() {
320 logging_info("Stopping...");
321
322 // stop timer
323 Timer::stop();
324
325 // delete oberlay interface
326 if(overlayInterface != NULL) {
327 delete overlayInterface;
328 overlayInterface = NULL;
329 }
330
331 // unregister at base communication
332 bc->unregisterMessageReceiver( this );
333 bc->unregisterEventListener( this );
334
335 started = false;
336 state = BaseOverlayStateInvalid;
337}
338
339bool BaseOverlay::isStarted(){
340 return started;
341}
342
343// ----------------------------------------------------------------------------
344
345void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
346 const EndpointDescriptor& bootstrapEp) {
347
348 if(id != spovnetId){
349 logging_error("attempt to join against invalid spovnet, call initiate first");
350 return;
351 }
352
353
354 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
355 logging_info( "Starting to join spovnet " << id.toString() <<
356 " with nodeid " << nodeId.toString());
357
358 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
359
360 // bootstrap against ourselfs
361 logging_debug("joining spovnet locally");
362
363 overlayInterface->joinOverlay();
364 state = BaseOverlayStateCompleted;
365 BOOST_FOREACH( NodeListener* i, nodeListeners )
366 i->onJoinCompleted( spovnetId );
367
368 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
369 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
370
371 logging_debug("starting overlay bootstrap module");
372 overlayBootstrap.start(this, spovnetId, nodeId);
373 overlayBootstrap.publish(bc->getEndpointDescriptor());
374
375 } else {
376
377 // bootstrap against another node
378 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
379
380 const LinkID& lnk = bc->establishLink( bootstrapEp );
381 bootstrapLinks.push_back(lnk);
382 logging_info("join process initiated for " << id.toString() << "...");
383 }
384}
385
386void BaseOverlay::leaveSpoVNet() {
387
388 logging_info( "Leaving spovnet " << spovnetId );
389 bool ret = ( state != this->BaseOverlayStateInvalid );
390
391 logging_debug("stopping overlay bootstrap module");
392 overlayBootstrap.stop();
393 overlayBootstrap.revoke();
394
395 logging_debug( "Dropping all auto-links" );
396
397 // gather all service links
398 vector<LinkID> servicelinks;
399 BOOST_FOREACH( LinkDescriptor* ld, links ) {
400 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
401 servicelinks.push_back( ld->overlayId );
402 }
403
404 // drop all service links
405 BOOST_FOREACH( LinkID lnk, servicelinks )
406 dropLink( lnk );
407
408 // let the node leave the spovnet overlay interface
409 logging_debug( "Leaving overlay" );
410 if( overlayInterface != NULL )
411 overlayInterface->leaveOverlay();
412
413 // drop still open bootstrap links
414 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
415 bc->dropLink( lnk );
416
417 // change to inalid state
418 state = BaseOverlayStateInvalid;
419 //ovl.visShutdown( ovlId, nodeId, string("") );
420
421 // inform all registered services of the event
422 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
423 if( ret ) i->onLeaveCompleted( spovnetId );
424 else i->onLeaveFailed( spovnetId );
425 }
426}
427
428void BaseOverlay::createSpoVNet(const SpoVNetID& id,
429 const OverlayParameterSet& param,
430 const SecurityParameterSet& sec,
431 const QoSParameterSet& qos) {
432
433 // set the state that we are an initiator, this way incoming messages are
434 // handled correctly
435 logging_info( "creating spovnet " + id.toString() <<
436 " with nodeid " << nodeId.toString() );
437
438 spovnetId = id;
439
440 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
441 if( overlayInterface == NULL ) {
442 logging_fatal( "overlay structure not supported" );
443 state = BaseOverlayStateInvalid;
444
445 BOOST_FOREACH( NodeListener* i, nodeListeners )
446 i->onJoinFailed( spovnetId );
447
448 return;
449 }
450}
451
452// ----------------------------------------------------------------------------
453
454const LinkID BaseOverlay::establishLink(
455 const EndpointDescriptor& ep, const NodeID& nodeid,
456 const ServiceID& service, const LinkID& linkid ) {
457
458 LinkID link_id = linkid;
459
460 // establish link via overlay
461 if (!nodeid.isUnspecified())
462 link_id = establishLink( nodeid, service, link_id );
463
464 // establish link directly if only ep is known
465 if (nodeid.isUnspecified())
466 establishLink( ep, service, link_id );
467
468 return link_id;
469}
470
471/// call base communication's establish link and add link mapping
472const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep,
473 const ServiceID& service, const LinkID& linkid ) {
474
475 // create a new link id if necessary
476 LinkID link_id = linkid;
477 if (link_id.isUnspecified()) link_id = LinkID::create();
478
479 /// find a service listener
480 if( !communicationListeners.contains( service ) ) {
481 logging_error( "No listener registered for service id=" << service.toString() );
482 return LinkID::UNSPECIFIED;
483 }
484 CommunicationListener* listener = communicationListeners.get( service );
485 assert( listener != NULL );
486
487 /// establish link and add mapping
488 logging_info("Establishing direct link " << link_id.toString()
489 << " using " << ep.toString());
490
491 // create descriptor
492 LinkDescriptor* ld = addDescriptor( link_id );
493 ld->overlayId = link_id;
494 ld->communicationId = link_id;
495 ld->listener = listener;
496 ld->service = service;
497 bc->establishLink( ep, link_id );
498
499 return link_id;
500}
501
502/// establishes a link between two arbitrary nodes
503const LinkID BaseOverlay::establishLink( const NodeID& node,
504 const ServiceID& service, const LinkID& link_id ) {
505
506 // do not establish a link to myself!
507 if (node == nodeId) return LinkID::UNSPECIFIED;
508
509 // create a link descriptor
510 LinkDescriptor* ld = createLinkDescriptor( node, service, link_id );
511
512 // create link request message with own link id
513 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
514 LinkRequest link_request_msg(
515 nonce, &bc->getEndpointDescriptor(), false,
516 ld->overlayId, ld->localRelay );
517 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
518 overlay_msg.encapsulate( &link_request_msg );
519 pendingLinks.insert( make_pair(nonce, ld->overlayId) );
520
521 // debug message
522 logging_debug(
523 "Sending link request with"
524 << " link id=" << ld->overlayId
525 << " node id=" << ld->remoteNode.toString()
526 << " service id=" << ld->service.toString()
527 << " local relay id=" << ld->localRelay.toString()
528 << " nonce= " << nonce
529 );
530
531 // sending message through new link
532 sendMessage( &overlay_msg, ld );
533
534 return ld->overlayId;
535}
536
537/// drops an established link
538void BaseOverlay::dropLink(const LinkID& link) {
539 logging_debug( "Dropping link (initiated locally):" << link.toString() );
540
541 // find the link item to drop
542 LinkDescriptor* ld = getDescriptor(link);
543 if( ld == NULL ) {
544 logging_warn( "Can't drop link, link is unknown!");
545 return;
546 }
547
548 // delete all queued messages
549 if( ld->messageQueue.size() > 0 ) {
550 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
551 << ld->messageQueue.size() << " waiting messages" );
552 ld->flushQueue();
553 }
554
555 // inform sideport and listener
556 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
557 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
558
559 // do not drop relay links
560 if (!ld->usedAsRelay) {
561 // drop the link in base communication
562 if (ld->communicationUp) bc->dropLink( ld->communicationId );
563
564 // erase descriptor
565 eraseDescriptor( ld->overlayId );
566 } else
567 ld->dropWhenRelaysLeft = true;
568}
569
570// ----------------------------------------------------------------------------
571
572/// internal send message, always use this functions to send messages over links
573seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
574 logging_debug( "Sending data message on link " << link.toString() );
575
576 // get the mapping for this link
577 LinkDescriptor* ld = getDescriptor(link);
578 if( ld == NULL ) {
579 logging_error("Could not send message. "
580 << "Link not found id=" << link.toString());
581 return -1;
582 }
583
584 // check if the link is up yet, if its an auto link queue message
585 if( !ld->up ) {
586 ld->markAsUsed();
587 if( ld->autolink ) {
588 logging_info("Auto-link " << link.toString() << " not up, queue message");
589 Data data = data_serialize( message );
590 const_cast<Message*>(message)->dropPayload();
591 ld->messageQueue.push_back( new Message(data) );
592 } else {
593 logging_error("Link " << link.toString() << " not up, drop message");
594 }
595 return -1;
596 }
597
598 // compile overlay message (has service and node id)
599 OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId );
600 overmsg.encapsulate( const_cast<Message*>(message) );
601
602 // send message over relay/direct/overlay
603 return sendMessage( &overmsg, ld );
604}
605
606seqnum_t BaseOverlay::sendMessage(const Message* message,
607 const NodeID& node, const ServiceID& service) {
608
609 // find link for node and service
610 LinkDescriptor* ld = getAutoDescriptor( node, service );
611
612 // if we found no link, create an auto link
613 if( ld == NULL ) {
614
615 // debug output
616 logging_info( "No link to send message to node "
617 << node.toString() << " found for service "
618 << service.toString() << ". Creating auto link ..."
619 );
620
621 // this will call onlinkup on us, if everything worked we now have a mapping
622 LinkID link = LinkID::create();
623
624 // call base overlay to create a link
625 link = establishLink( node, service, link );
626 ld = getDescriptor( link );
627 if( ld == NULL ) {
628 logging_error( "Failed to establish auto-link.");
629 return -1;
630 }
631 ld->autolink = true;
632
633 logging_debug( "Auto-link establishment in progress to node "
634 << node.toString() << " with link id=" << link.toString() );
635 }
636 assert(ld != NULL);
637
638 // mark the link as used, as we now send a message through it
639 ld->markAsUsed();
640
641 // send / queue message
642 return sendMessage( message, ld->overlayId );
643}
644
645// ----------------------------------------------------------------------------
646
647const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
648 const LinkID& link) const {
649
650 // return own end-point descriptor
651 if( link == LinkID::UNSPECIFIED )
652 return bc->getEndpointDescriptor();
653
654 // find link descriptor. not found -> return unspecified
655 const LinkDescriptor* ld = getDescriptor(link);
656 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
657
658 // return endpoint-descriptor from base communication
659 return bc->getEndpointDescriptor( ld->communicationId );
660}
661
662const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
663 const NodeID& node) const {
664
665 // return own end-point descriptor
666 if( node == nodeId || node == NodeID::UNSPECIFIED )
667 return bc->getEndpointDescriptor();
668
669 // no joined and request remote descriptor? -> fail!
670 if( overlayInterface == NULL ) {
671 logging_error( "overlay interface not set, cannot resolve endpoint" );
672 return EndpointDescriptor::UNSPECIFIED();
673 }
674
675 // resolve end-point descriptor from the base-overlay routing table
676 return overlayInterface->resolveNode( node );
677}
678
679// ----------------------------------------------------------------------------
680
681bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
682 sideport = _sideport;
683 _sideport->configure( this );
684}
685
686bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
687 sideport = &SideportListener::DEFAULT;
688}
689
690// ----------------------------------------------------------------------------
691
692bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
693 logging_debug( "binding communication listener " << listener
694 << " on serviceid " << sid.toString() );
695
696 if( communicationListeners.contains( sid ) ) {
697 logging_error( "some listener already registered for service id "
698 << sid.toString() );
699 return false;
700 }
701
702 communicationListeners.registerItem( listener, sid );
703 return true;
704}
705
706
707bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
708 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
709
710 if( !communicationListeners.contains( sid ) ) {
711 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
712 return false;
713 }
714
715 if( communicationListeners.get(sid) != listener ) {
716 logging_warn( "listener bound to service id " << sid.toString()
717 << " is different than listener trying to unbind" );
718 return false;
719 }
720
721 communicationListeners.unregisterItem( sid );
722 return true;
723}
724
725// ----------------------------------------------------------------------------
726
727bool BaseOverlay::bind(NodeListener* listener) {
728 logging_debug( "Binding node listener " << listener );
729
730 // already bound? yes-> warning
731 NodeListenerVector::iterator i =
732 find( nodeListeners.begin(), nodeListeners.end(), listener );
733 if( i != nodeListeners.end() ) {
734 logging_warn("Node listener " << listener << " is already bound!" );
735 return false;
736 }
737
738 // no-> add
739 nodeListeners.push_back( listener );
740 return true;
741}
742
743bool BaseOverlay::unbind(NodeListener* listener) {
744 logging_debug( "Unbinding node listener " << listener );
745
746 // already unbound? yes-> warning
747 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
748 if( i == nodeListeners.end() ) {
749 logging_warn( "Node listener " << listener << " is not bound!" );
750 return false;
751 }
752
753 // no-> remove
754 nodeListeners.erase( i );
755 return true;
756}
757
758// ----------------------------------------------------------------------------
759
760void BaseOverlay::onLinkUp(const LinkID& id,
761 const address_v* local, const address_v* remote) {
762 logging_debug( "Link up with base communication link id=" << id );
763
764 // get descriptor for link
765 LinkDescriptor* ld = getDescriptor(id, true);
766
767 // handle bootstrap link we initiated
768 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
769 logging_info(
770 "Join has been initiated by me and the link is now up. " <<
771 "Sending out join request for SpoVNet " << spovnetId.toString()
772 );
773
774 // send join request message
775 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId );
776 JoinRequest joinRequest( spovnetId, nodeId );
777 overlayMsg.encapsulate( &joinRequest );
778 bc->sendMessage( id, &overlayMsg );
779 return;
780 }
781
782 // no link found? -> link establishment from remote, add one!
783 if (ld == NULL) {
784 ld = addDescriptor( id );
785 logging_debug( "onLinkUp (remote request) descriptor: " << ld );
786
787 // update descriptor
788 ld->fromRemote = true;
789 ld->communicationId = id;
790 ld->communicationUp = true;
791 ld->markAsUsed();
792
793 // in this case, do not inform listener, since service it unknown
794 // -> wait for update message!
795
796 // link mapping found? -> send update message with node-id and service id
797 } else {
798 logging_debug( "onLinkUp descriptor (initiated locally):" << ld );
799
800 // note: necessary to validate the link on the remote side!
801 logging_debug( "Sending out update" <<
802 " for service " << ld->service.toString() <<
803 " with local node id " << nodeId.toString() <<
804 " on link " << ld->overlayId.toString() );
805
806 // update descriptor
807 ld->markAsUsed();
808 ld->communicationUp = true;
809
810 // if link is a relayed link ->convert to direct link
811 if (ld->relay) {
812 logging_info( "Converting to direct link: " << ld );
813 ld->up = true;
814 ld->relay = false;
815 ld->localRelay = NodeID::UNSPECIFIED;
816 OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId );
817 overMsg.setRelayLink( ld->remoteLinkId );
818 bc->sendMessage( ld->communicationId, &overMsg );
819 }
820
821 // compile and send update message
822 OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
823 overlayMsg.setAutoLink( ld->autolink );
824 bc->sendMessage( ld->communicationId, &overlayMsg );
825 }
826}
827
828void BaseOverlay::onLinkDown(const LinkID& id,
829 const address_v* local, const address_v* remote) {
830
831 // erase bootstrap links
832 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
833 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
834
835 // get descriptor for link
836 LinkDescriptor* ld = getDescriptor(id, true);
837 if ( ld == NULL ) return; // not found? ->ignore!
838 logging_info( "onLinkDown descriptor: " << ld );
839
840 // inform listeners about link down
841 ld->communicationUp = false;
842 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
843 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
844
845 // delete all queued messages (auto links)
846 if( ld->messageQueue.size() > 0 ) {
847 logging_warn( "Dropping link " << id.toString() << " that has "
848 << ld->messageQueue.size() << " waiting messages" );
849 ld->flushQueue();
850 }
851
852 // erase mapping
853 eraseDescriptor(ld->overlayId);
854}
855
856void BaseOverlay::onLinkChanged(const LinkID& id,
857 const address_v* oldlocal, const address_v* newlocal,
858 const address_v* oldremote, const address_v* newremote) {
859
860 // get descriptor for link
861 LinkDescriptor* ld = getDescriptor(id, true);
862 if ( ld == NULL ) return; // not found? ->ignore!
863 logging_debug( "onLinkChanged descriptor: " << ld );
864
865 // inform listeners
866 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
867 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
868
869 // autolinks: refresh timestamp
870 ld->markAsUsed();
871}
872
873void BaseOverlay::onLinkFail(const LinkID& id,
874 const address_v* local, const address_v* remote) {
875 logging_debug( "Link fail with base communication link id=" << id );
876
877 // erase bootstrap links
878 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
879 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
880
881 // get descriptor for link
882 LinkDescriptor* ld = getDescriptor(id, true);
883 if ( ld == NULL ) return; // not found? ->ignore!
884 logging_debug( "Link failed id=" << ld->overlayId.toString() );
885
886 // inform listeners
887 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
888 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
889
890 // autolinks: refresh timestamp
891 ld->markAsUsed();
892}
893
894void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
895 const address_v* remote, const QoSParameterSet& qos) {
896 logging_debug( "Link quality changed with base communication link id=" << id );
897
898 // get descriptor for link
899 LinkDescriptor* ld = getDescriptor(id, true);
900 if ( ld == NULL ) return; // not found? ->ignore!
901 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
902
903 // autolinks: refresh timestamp
904 ld->markAsUsed();
905}
906
907bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
908 const address_v* remote ) {
909 logging_debug("Accepting link request from " << remote->to_string() );
910 return true;
911}
912
913/// handles a message from base communication
914bool BaseOverlay::receiveMessage(const Message* message,
915 const LinkID& link, const NodeID& ) {
916 // get descriptor for link
917 LinkDescriptor* ld = getDescriptor( link, true );
918
919 // link known?
920 if (ld == NULL) { // no-> handle with unspecified params
921 logging_debug("Received message from base communication, link descriptor unknown" );
922 return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED );
923 } else { // yes -> handle with overlay link id
924 logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() );
925 return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED );
926 }
927}
928
929// ----------------------------------------------------------------------------
930
931/// handles a message from an overlay
932void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) {
933 logging_debug("Received message from overlay -- "
934 << " link id=" << link.toString()
935 << " node id=" << source.toString() );
936 handleMessage( msg, link, LinkID::UNSPECIFIED, source );
937}
938
939// ----------------------------------------------------------------------------
940
941/// handles an incoming message
942bool BaseOverlay::handleMessage( const Message* message,
943 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) {
944 logging_debug( "Handling message: " << message->toString());
945
946 // decapsulate overlay message
947 OverlayMsg* overlayMsg =
948 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
949 if( overlayMsg == NULL ) return false;
950
951 // mark the link as in action
952 LinkDescriptor* ld = getDescriptor(boLink);
953 if (ld == NULL) ld = getDescriptor(bcLink, true);
954 if (ld != NULL) {
955 ld->markAsUsed();
956 ld->markAlive();
957 }
958
959 switch ( overlayMsg->getType() ) {
960 // ---------------------------------------------------------------------
961 // Handle spovnet instance join requests
962 // ---------------------------------------------------------------------
963 case OverlayMsg::typeJoinRequest: {
964
965 // decapsulate message
966 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
967 logging_info( "Received join request for spovnet " <<
968 joinReq->getSpoVNetID().toString() );
969
970 // check spovnet id
971 if( joinReq->getSpoVNetID() != spovnetId ) {
972 logging_error(
973 "Received join request for spovnet we don't handle " <<
974 joinReq->getSpoVNetID().toString() );
975 return false;
976 }
977
978 // TODO: here you can implement mechanisms to deny joining of a node
979 bool allow = true;
980 logging_info( "Sending join reply for spovnet " <<
981 spovnetId.toString() << " to node " <<
982 overlayMsg->getSourceNode().toString() <<
983 ". Result: " << (allow ? "allowed" : "denied") );
984 joiningNodes.push_back( overlayMsg->getSourceNode() );
985
986 // return overlay parameters
987 assert( overlayInterface != NULL );
988 logging_debug( "Using bootstrap end-point "
989 << getEndpointDescriptor().toString() )
990 OverlayParameterSet parameters = overlayInterface->getParameters();
991 OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId );
992 JoinReply replyMsg( spovnetId, parameters,
993 allow, getEndpointDescriptor() );
994 retmsg.encapsulate(&replyMsg);
995 bc->sendMessage( bcLink, &retmsg );
996 return true;
997 }
998
999 // ---------------------------------------------------------------------
1000 // handle replies to spovnet instance join requests
1001 // ---------------------------------------------------------------------
1002 case OverlayMsg::typeJoinReply: {
1003
1004 // decapsulate message
1005 logging_debug("received join reply message");
1006 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1007
1008 // correct spovnet?
1009 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1010 logging_error( "Received SpoVNet join reply for " <<
1011 replyMsg->getSpoVNetID().toString() <<
1012 " != " << spovnetId.toString() );
1013 return false;
1014 }
1015
1016 // access granted? no -> fail
1017 if( !replyMsg->getJoinAllowed() ) {
1018 logging_error( "Our join request has been denied" );
1019
1020 // drop initiator link
1021
1022 if(bcLink != LinkID::UNSPECIFIED){
1023 bc->dropLink( bcLink );
1024
1025 vector<LinkID>::iterator it = std::find(
1026 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1027 if( it != bootstrapLinks.end() )
1028 bootstrapLinks.erase(it);
1029 }
1030
1031 // inform all registered services of the event
1032 BOOST_FOREACH( NodeListener* i, nodeListeners )
1033 i->onJoinFailed( spovnetId );
1034
1035 return true;
1036 }
1037
1038 // access has been granted -> continue!
1039 logging_info("Join request has been accepted for spovnet " <<
1040 spovnetId.toString() );
1041
1042 logging_debug( "Using bootstrap end-point "
1043 << replyMsg->getBootstrapEndpoint().toString() );
1044
1045 //
1046 // create overlay structure from spovnet parameter set
1047 // if we have not boostrapped yet against some other node
1048 //
1049
1050 if( overlayInterface == NULL ){
1051
1052 logging_debug("first-time bootstrapping");
1053
1054 overlayInterface = OverlayFactory::create(
1055 *this, replyMsg->getParam(), nodeId, this );
1056
1057 // overlay structure supported? no-> fail!
1058 if( overlayInterface == NULL ) {
1059 logging_error( "overlay structure not supported" );
1060
1061 if(bcLink != LinkID::UNSPECIFIED){
1062 bc->dropLink( bcLink );
1063
1064 vector<LinkID>::iterator it = std::find(
1065 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1066 if( it != bootstrapLinks.end() )
1067 bootstrapLinks.erase(it);
1068 }
1069
1070 // inform all registered services of the event
1071 BOOST_FOREACH( NodeListener* i, nodeListeners )
1072 i->onJoinFailed( spovnetId );
1073
1074 return true;
1075 }
1076
1077 // everything ok-> join the overlay!
1078 state = BaseOverlayStateCompleted;
1079 overlayInterface->createOverlay();
1080
1081 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1082
1083 // update ovlvis
1084 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1085
1086 // inform all registered services of the event
1087 BOOST_FOREACH( NodeListener* i, nodeListeners )
1088 i->onJoinCompleted( spovnetId );
1089
1090 } else {
1091
1092 // this is not the first bootstrap, just join the additional node
1093 logging_debug("not first-time bootstrapping");
1094 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1095
1096 } // if( overlayInterface == NULL )
1097
1098 return true;
1099 }
1100
1101 // ---------------------------------------------------------------------
1102 // handle data forward messages
1103 // ---------------------------------------------------------------------
1104 case OverlayMsg::typeData: {
1105
1106 // get service
1107 const ServiceID& service = overlayMsg->getService();
1108 logging_debug( "received data for service " << service.toString() );
1109
1110 // find listener
1111 CommunicationListener* listener =
1112 communicationListeners.get( service );
1113 if( listener == NULL ) return true;
1114
1115 // delegate data message
1116 listener->onMessage( overlayMsg,
1117 overlayMsg->getSourceNode(), ld->overlayId );
1118
1119 return true;
1120 }
1121
1122 // ---------------------------------------------------------------------
1123 // handle update messages for link establishment
1124 // ---------------------------------------------------------------------
1125 case OverlayMsg::typeUpdate: {
1126 logging_debug("Received type update message on link " << ld );
1127
1128 // get info
1129 const NodeID& sourcenode = overlayMsg->getSourceNode();
1130 const ServiceID& service = overlayMsg->getService();
1131
1132 // no link descriptor available -> error!
1133 if( ld == NULL ) {
1134 logging_warn( "received overlay update message for link " <<
1135 ld->overlayId.toString() << " for which we have no mapping" );
1136 return false;
1137 }
1138
1139 // update our link mapping information for this link
1140 bool changed =
1141 ( ld->remoteNode != sourcenode ) || ( ld->service != service );
1142 ld->remoteNode = sourcenode;
1143 ld->service = service;
1144 ld->autolink = overlayMsg->isAutoLink();
1145
1146 // if our link information changed, we send out an update, too
1147 if( changed ) {
1148 OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
1149 overMsg.setAutoLink(ld->autolink);
1150 bc->sendMessage( ld->communicationId, &overMsg );
1151 }
1152
1153 // service registered? no-> error!
1154 if( !communicationListeners.contains( service ) ) {
1155 logging_warn( "Link up: event listener has not been registered" );
1156 return false;
1157 }
1158
1159 // default or no service registered?
1160 CommunicationListener* listener = communicationListeners.get( service );
1161 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1162 logging_warn("Link up: event listener is default or null!" );
1163 return true;
1164 }
1165
1166 // update descriptor
1167 ld->listener = listener;
1168 ld->markAsUsed();
1169 ld->markAlive();
1170
1171 // ask the service whether it wants to accept this link
1172 if( !listener->onLinkRequest(sourcenode) ) {
1173
1174 logging_debug("Link id=" << ld->overlayId.toString() <<
1175 " has been denied by service " << service.toString() << ", dropping link");
1176
1177 // prevent onLinkDown calls to the service
1178 ld->listener = &CommunicationListener::DEFAULT;
1179
1180 // drop the link
1181 dropLink( ld->overlayId );
1182 return true;
1183 }
1184
1185 // set link up
1186 ld->up = true;
1187 logging_debug(
1188 "Link " << ld->overlayId.toString()
1189 << " has been accepted by service " << service.toString()
1190 << " and is now up"
1191 );
1192
1193 // auto links: link has been accepted -> send queued messages
1194 if( ld->messageQueue.size() > 0 ) {
1195 logging_info( "sending out queued messages on link " <<
1196 ld->overlayId.toString() );
1197 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1198 sendMessage( msg, ld->overlayId );
1199 delete msg;
1200 }
1201 ld->messageQueue.clear();
1202 }
1203
1204 // call the notification functions
1205 listener->onLinkUp( ld->overlayId, sourcenode );
1206 sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId );
1207
1208 return true;
1209 }
1210
1211 // ---------------------------------------------------------------------
1212 // handle link request forwarded through the overlay
1213 // ---------------------------------------------------------------------
1214 case OverlayMsg::typeLinkRequest: {
1215
1216 logging_debug( "received link request on link" );
1217
1218 // decapsulate message
1219 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
1220 const ServiceID& service = overlayMsg->getService();
1221
1222 // is request reply?
1223 if ( linkReq->isReply() ) {
1224
1225 // find link
1226 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
1227 if ( i == pendingLinks.end() ) {
1228 logging_error( "Nonce not found in link request" );
1229 return true;
1230 }
1231
1232 // debug message
1233 logging_debug( "Link request reply received. Establishing link "
1234 << i->second << " to " << (linkReq->getEndpoint()->toString())
1235 << " for service " << service.toString()
1236 << " with nonce " << linkReq->getNonce()
1237 << " using relay " << linkReq->getRelay().toString()
1238 << " and remote link id=" << linkReq->getRemoteLinkId()
1239 );
1240
1241 // get descriptor
1242 LinkDescriptor* ldn = getDescriptor(i->second);
1243
1244 // check if link request reply has a relay node ...
1245 if (!linkReq->getRelay().isUnspecified()) { // yes->
1246 ldn->up = true;
1247 ldn->relay = true;
1248 if (ldn->localRelay.isUnspecified()) {
1249 logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn );
1250 showLinkState();
1251 }
1252 ldn->remoteRelay = linkReq->getRelay();
1253 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1254 ldn->remoteNode = overlayMsg->getSourceNode();
1255
1256 ldn->markAlive();
1257
1258 // compile and send update message
1259 OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId );
1260 _overlayMsg.setAutoLink(ldn->autolink);
1261 sendMessage( &_overlayMsg, ldn );
1262
1263 // auto links: link has been accepted -> send queued messages
1264 if( ldn->messageQueue.size() > 0 ) {
1265 logging_info( "Sending out queued messages on link " <<
1266 ldn->overlayId.toString() );
1267 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1268 sendMessage( msg, ldn->overlayId );
1269 delete msg;
1270 }
1271 ldn->messageQueue.clear();
1272 }
1273
1274 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1275
1276 // try to establish a direct link
1277 ldn->communicationId =
1278 bc->establishLink( *linkReq->getEndpoint(), i->second );
1279 }
1280
1281 // no relay node-> use overlay routing
1282 else {
1283 ldn->up = true;
1284
1285 // establish direct link
1286 ldn->communicationId =
1287 bc->establishLink( *linkReq->getEndpoint(), i->second );
1288 }
1289 } else {
1290 logging_debug( "Link request received from node id="
1291 << overlayMsg->getSourceNode() );
1292
1293 // create link descriptor
1294 LinkDescriptor* ldn =
1295 createLinkDescriptor(overlayMsg->getSourceNode(),
1296 overlayMsg->getService(), LinkID::UNSPECIFIED );
1297 assert(!ldn->overlayId.isUnspecified());
1298
1299 // create reply message
1300 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
1301 LinkRequest link_request_msg(
1302 linkReq->getNonce(),
1303 &bc->getEndpointDescriptor(),
1304 true, ldn->overlayId, ldn->localRelay
1305 );
1306 overlay_msg.encapsulate( &link_request_msg );
1307
1308 // debug message
1309 logging_debug( "Sending LinkRequest reply for link with nonce " <<
1310 linkReq->getNonce() );
1311
1312 // if this is a relay link-> update information & inform listeners
1313 if (!linkReq->getRelay().isUnspecified()) {
1314 // set flags
1315 ldn->up = true;
1316 ldn->relay = true;
1317 if (ldn->localRelay.isUnspecified()) {
1318 logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn );
1319 showLinkState();
1320 }
1321 ldn->remoteRelay = linkReq->getRelay();
1322 ldn->remoteNode = overlayMsg->getSourceNode();
1323 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1324 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1325 }
1326
1327 // route message back over overlay
1328 sendMessage( &overlay_msg, ldn );
1329 }
1330 return true;
1331 }
1332
1333 // ---------------------------------------------------------------------
1334 // handle relay message to forward messages
1335 // ---------------------------------------------------------------------
1336 case OverlayMsg::typeRelay: {
1337
1338 logging_debug( "received relay request on link" );
1339
1340 // decapsulate message
1341 RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>();
1342
1343 // is relay message informative?
1344 switch (relayMsg->getType()) {
1345
1346 // handle relay notification
1347 case RelayMessage::typeInform: {
1348 logging_info("Received relay information message with"
1349 << " relay " << relayMsg->getRelayNode()
1350 << " destination " << relayMsg->getDestNode() );
1351
1352 // mark incoming link as relay
1353 if (ld!=NULL) ld->markAsRelay();
1354
1355 // am I the destination of this message? yes->
1356 if (relayMsg->getDestNode() == nodeId ) {
1357 // deliver relay message locally!
1358 logging_debug("Relay message reached destination. Handling the message.");
1359 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1360 return true;
1361 }
1362
1363 // create route message
1364 OverlayMsg _overMsg( *overlayMsg );
1365 RelayMessage _relayMsg( *relayMsg );
1366 _relayMsg.setType( RelayMessage::typeRoute );
1367 _overMsg.encapsulate( &_relayMsg );
1368
1369 // forward message
1370 if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) {
1371 logging_info("Routing relay message to " << relayMsg->getDestNode().toString() );
1372 sendOverlay( &_overMsg, relayMsg->getDestNode() );
1373 } else {
1374 logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() );
1375 sendOverlay( &_overMsg, relayMsg->getRelayNode() );
1376 }
1377 return true;
1378 }
1379
1380 // handle relay routing
1381 case RelayMessage::typeRoute: {
1382 logging_info("Received relay route message with"
1383 << " relay " << relayMsg->getRelayNode()
1384 << " destination " << relayMsg->getDestNode() );
1385
1386 // mark incoming link as relay
1387 if (ld!=NULL) ld->markAsRelay();
1388
1389 // am I the destination of this message? yes->
1390 if (relayMsg->getDestNode() == nodeId ) {
1391 // deliver relay message locally!
1392 logging_debug("Relay message reached destination. Handling the message.");
1393 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1394 return true;
1395 }
1396
1397 // am I the relay for this message? yes->
1398 if (relayMsg->getRelayNode() == nodeId ) {
1399 logging_debug("I'm the relay for this message. Sending to destination.");
1400 OverlayMsg _overMsg( *overlayMsg );
1401 RelayMessage _relayMsg( *relayMsg );
1402 _overMsg.encapsulate(&_relayMsg);
1403
1404 /// this must be handled by using relay link!
1405 sendOverlay(&_overMsg, relayMsg->getDestNode());
1406 return true;
1407 }
1408
1409 // error: I'm not a relay or destination!
1410 logging_error("This node is neither relay nor destination. Dropping Message!");
1411 return true;
1412 }
1413 default: {
1414 logging_error("RelayMessage Unknown!");
1415 return true;
1416 }
1417 }
1418
1419 break;
1420 }
1421
1422 // ---------------------------------------------------------------------
1423 // handle keep-alive messages
1424 // ---------------------------------------------------------------------
1425 case OverlayMsg::typeKeepAlive: {
1426 logging_debug( "received keep-alive on link" );
1427 if ( ld != NULL ) {
1428 logging_info("Keep-Alive for "<< ld->overlayId);
1429 ld->markAlive();
1430 }
1431 break;
1432 }
1433
1434 // ---------------------------------------------------------------------
1435 // handle direct link replacement messages
1436 // ---------------------------------------------------------------------
1437 case OverlayMsg::typeDirectLink: {
1438
1439 logging_debug( "Received direct link replacement request" );
1440
1441 LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() );
1442 if (rld == NULL) {
1443 logging_error("Direct link replacement: Link "
1444 << overlayMsg->getRelayLink() << "not found error." );
1445 break;
1446 }
1447 logging_info( "Received direct link convert notification for " << rld );
1448
1449 // set communcation link id and set it up
1450 rld->communicationId = ld->communicationId;
1451
1452 // this is neccessary since this link was a relay link before!
1453 rld->communicationUp = true;
1454
1455 // this is not a relay link anymore!
1456 rld->relay = false;
1457 rld->localRelay = NodeID::UNSPECIFIED;
1458 rld->remoteRelay = NodeID::UNSPECIFIED;
1459
1460 // mark used and alive!
1461 rld->markAsUsed();
1462 rld->markAlive();
1463
1464 // erase the original descriptor
1465 eraseDescriptor(ld->overlayId);
1466 break;
1467 }
1468
1469 // ---------------------------------------------------------------------
1470 // handle unknown message type
1471 // ---------------------------------------------------------------------
1472 default: {
1473 logging_error( "received message in invalid state! don't know " <<
1474 "what to do with this message of type " <<
1475 overlayMsg->getType() );
1476 return false;
1477 }
1478 } /* switch */
1479
1480 return false;
1481}
1482
1483// ----------------------------------------------------------------------------
1484
1485void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1486
1487 logging_debug( "broadcasting message to all known nodes " <<
1488 "in the overlay from service " + service.toString() );
1489
1490 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1491 OverlayInterface::NodeList::iterator i = nodes.begin();
1492 for(; i != nodes.end(); i++ ) {
1493 if( *i == nodeId) continue; // don't send to ourselfs
1494 sendMessage( message, *i, service );
1495 }
1496}
1497
1498vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1499 // the known nodes _can_ also include our node, so we remove ourself
1500 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1501
1502 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1503 if( i != nodes.end() ) nodes.erase( i );
1504
1505 return nodes;
1506}
1507
1508const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1509 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1510 const LinkDescriptor* ld = getDescriptor(lid);
1511 if( ld == NULL ) return NodeID::UNSPECIFIED;
1512 else return ld->remoteNode;
1513}
1514
1515vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1516 vector<LinkID> linkvector;
1517 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1518 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1519 linkvector.push_back( ld->overlayId );
1520 }
1521 }
1522 return linkvector;
1523}
1524
1525
1526void BaseOverlay::onNodeJoin(const NodeID& node) {
1527 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1528 if( i == joiningNodes.end() ) return;
1529
1530 logging_info( "node has successfully joined baseoverlay and overlay structure "
1531 << node.toString() );
1532
1533 joiningNodes.erase( i );
1534}
1535
1536void BaseOverlay::eventFunction() {
1537
1538 // send keep-alive messages over established links
1539 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1540 if (!ld->up) continue;
1541 OverlayMsg overMsg( OverlayMsg::typeKeepAlive,
1542 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1543 sendMessage( &overMsg, ld );
1544 }
1545
1546 // iterate over all links and check for time boundaries
1547 vector<LinkDescriptor*> oldlinks;
1548 time_t now = time(NULL);
1549 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1550 // remote used as relay flag
1551 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)
1552 ld->usedAsRelay = false;
1553
1554 // keep alives and not up? yes-> link connection request is stale!
1555 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
1556
1557 // increase counter
1558 ld->keepAliveMissed++;
1559
1560 // missed more than four keep-alive messages (10 sec)? -> drop link
1561 if (ld->keepAliveMissed > 4) {
1562 logging_info( "Link connection request is stale, closing: " << ld );
1563 oldlinks.push_back( ld );
1564 continue;
1565 }
1566 }
1567
1568 if (!ld->up) continue;
1569
1570 // drop links that are dropped and not used as relay
1571 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink) {
1572 oldlinks.push_back( ld );
1573 continue;
1574 }
1575
1576 // auto-link time exceeded?
1577 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
1578 oldlinks.push_back( ld );
1579 continue;
1580 }
1581
1582 // keep alives missed? yes->
1583 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
1584
1585 // increase counter
1586 ld->keepAliveMissed++;
1587
1588 // missed more than four keep-alive messages (4 sec)? -> drop link
1589 if (ld->keepAliveMissed >= 8) {
1590 logging_info( "Link is stale, closing: " << ld );
1591 oldlinks.push_back( ld );
1592 continue;
1593 }
1594 }
1595 }
1596
1597 // drop links
1598 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
1599/*
1600 vector<LinkID>::iterator it = std::find(
1601 bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId);
1602
1603 if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){
1604 logging_info( "Not dropping initiator link: " << ld );
1605 continue;
1606 }
1607*/
1608 logging_info( "Link timed out. Dropping " << ld );
1609 dropLink( ld->overlayId );
1610 }
1611
1612 // show link state
1613 counter++;
1614 if (counter>=4) showLinkState();
1615 if (counter>=4 || counter<0) counter = 0;
1616}
1617
1618void BaseOverlay::showLinkState() {
1619 int i=0;
1620 logging_info("--- link state -------------------------------");
1621 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1622 logging_info("link " << i << ": " << ld);
1623 i++;
1624 }
1625 logging_info("----------------------------------------------");
1626}
1627
1628}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.