source: source/ariba/overlay/BaseOverlay.cpp@ 5860

Last change on this file since 5860 was 5860, checked in by Christoph Mayer, 15 years ago

networkinfo fix wenn socket kaputt geht, erfolgreich verwendete bootstrap infos speichern und wenn overlay verbindungen alle weg sind diese infos ausprobieren

File size: 51.6 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51#include "ariba/overlay/messages/OverlayMsg.h"
52#include "ariba/overlay/messages/JoinRequest.h"
53#include "ariba/overlay/messages/JoinReply.h"
54#include "ariba/overlay/messages/LinkRequest.h"
55#include "ariba/overlay/messages/RelayMessage.h"
56
57#include "ariba/utility/misc/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
63 BOOST_FOREACH( LinkDescriptor* lp, links )
64 if ((communication ? lp->communicationId : lp->overlayId) == link)
65 return lp;
66 return NULL;
67}
68
69const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
70 BOOST_FOREACH( const LinkDescriptor* lp, links )
71 if ((communication ? lp->communicationId : lp->overlayId) == link)
72 return lp;
73 return NULL;
74}
75
76/// erases a link descriptor
77void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
78 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
79 LinkDescriptor* ld = *i;
80 if ((communication ? ld->communicationId : ld->overlayId) == link) {
81 delete ld;
82 links.erase(i);
83 break;
84 }
85 }
86}
87
88/// adds a link descriptor
89LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
90 LinkDescriptor* desc = getDescriptor( link );
91 if ( desc == NULL ) {
92 desc = new LinkDescriptor();
93 desc->overlayId = link;
94 links.push_back(desc);
95 }
96 return desc;
97}
98
99/// returns a auto-link descriptor
100LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
101 BOOST_FOREACH( LinkDescriptor* lp, links )
102 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up)
103 return lp;
104 BOOST_FOREACH( LinkDescriptor* lp, links )
105 if (lp->autolink && lp->remoteNode == node && lp->service == service )
106 return lp;
107 return NULL;
108}
109
110/// returns a direct local link relay descriptor for the given remote node
111LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& id ) {
112
113 // get used next hop towards node
114 LinkDescriptor* rld = NULL;
115 NodeID relayNode = NodeID::UNSPECIFIED;
116 LinkID rlid = overlayInterface->getNextLinkId(id);
117 if ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {
118 // get descriptor of first hop
119 rld = getDescriptor(rlid);
120
121 // is first hop a relay path use local relay
122 if ( rld->relay ) relayNode = rld->localRelay;
123
124 // no-> a proper relay node has been found
125 else relayNode = rld->remoteNode;
126 }
127
128 // if first relay is unknown choose a arbitrary direct node as relay
129 if ( relayNode.isUnspecified() ) {
130 for (size_t i=0; i<links.size(); i++)
131 if (links[i]->up &&
132 links[i]->communicationUp &&
133 !links[i]->relay &&
134 links[i]->keepAliveMissed <= 1 &&
135 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
136 relayNode = links[i]->remoteNode;
137 break;
138 }
139 }
140
141 // no local relay found-> damn!
142 if (relayNode.isUnspecified()) return NULL;
143
144 // get descriptor
145 BOOST_FOREACH( LinkDescriptor* lp, links )
146 if (lp->remoteNode == relayNode &&
147 lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&
148 lp->relay == false &&
149 lp->up)
150 return lp;
151
152 return NULL;
153}
154
155/// returns the link descriptor that is actually used for sending a message over the overöay
156LinkDescriptor* BaseOverlay::getSendDescriptor( const NodeID& nodeid, bool follow ) {
157 for (size_t i=0; i<links.size(); i++)
158 if ( !links[i]->relay &&
159 links[i]->up &&
160 links[i]->communicationUp &&
161 links[i]->keepAliveMissed <= 1 &&
162 links[i]->remoteNode == nodeid &&
163 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
164 return links[i];
165 }
166 LinkDescriptor* ld = getDescriptor(overlayInterface->getNextLinkId(nodeid));
167 if (ld != NULL && ld->relay && follow)
168 return getSendDescriptor(ld->localRelay, false);
169 return NULL;
170}
171
172NodeID BaseOverlay::getRelayNode( const NodeID& remoteNode ) {
173 LinkDescriptor* rld = getRelayDescriptor(remoteNode);
174 return rld!=NULL ? rld->remoteNode : NodeID::UNSPECIFIED;
175}
176
177/// routes a message over the overlay or directly sends it when a link is open
178seqnum_t BaseOverlay::sendOverlay( Message* message, const NodeID& nodeid, const NodeID& remoteRelay ) {
179 /// send message directly to a neighbor
180 for (size_t i=0; i<links.size(); i++)
181 if ( !links[i]->relay &&
182 links[i]->up &&
183 links[i]->communicationUp &&
184 links[i]->keepAliveMissed <= 1 &&
185 links[i]->remoteNode == nodeid &&
186 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
187
188 // mark as relay and send message
189 links[i]->markAsRelay();
190 return sendMessage( message, links[i] );
191 }
192
193 /// send relayed message over the overlay
194 if (!remoteRelay.isUnspecified()) {
195 // create a information relay message to inform the relay about
196 OverlayMsg overlay_msg(
197 OverlayMsg::typeRelay, OverlayInterface::OVERLAY_SERVICE_ID, nodeId);
198 RelayMessage relayMsg( RelayMessage::typeInform, remoteRelay, nodeid, LinkID::UNSPECIFIED );
199 relayMsg.encapsulate( message );
200 overlay_msg.encapsulate( &relayMsg );
201
202 // get local relay link
203 LinkDescriptor* rld = getRelayDescriptor(nodeid);
204
205 // local relay available? send to local relay!
206 if (rld!=NULL) {
207 rld->markAsRelay();
208 sendMessage(&overlay_msg, rld);
209 } else
210 overlayInterface->routeMessage(remoteRelay, &overlay_msg);
211
212 // finished
213 return 0;
214 }
215
216 // common case: send message over the overlay
217 overlayInterface->routeMessage(nodeid, message);
218 return 0;
219}
220
221/// forwards a message over relays/directly using link descriptor
222seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) {
223
224 // directly send message
225 if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) {
226 logging_debug("Send: Sending message via Base Communication");
227 return bc->sendMessage( ld->communicationId, message );
228 }
229
230 // relay message
231 else if ( ld->relay ) {
232
233 // sending a relayed message
234 logging_debug("Send: Relaying message to node "
235 << ld->remoteNode.toString()
236 << " using relay " << ld->localRelay
237 );
238
239 // create a information relay message to inform the relay about
240 OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId );
241 RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId );
242 relayMsg.encapsulate( message );
243 overlay_msg.encapsulate( &relayMsg );
244
245 // route message to relay node in order to inform it!
246 logging_debug("sendMessage: Sending message over relayed link with" << ld );
247 sendOverlay( &overlay_msg, ld->localRelay );
248 return 0;
249 }
250
251 // error
252 else {
253 logging_error( "Could not send message descriptor=" << ld );
254 return -1;
255 }
256 return -1;
257}
258
259/// creates a link descriptor, apply relay semantics if possible
260LinkDescriptor* BaseOverlay::createLinkDescriptor(
261 const NodeID remoteNode, const ServiceID service, const LinkID link_id ) {
262
263 // find listener
264 if( !communicationListeners.contains( service ) ) {
265 logging_error( "No listener found for service " << service.toString() );
266 return NULL;
267 }
268 CommunicationListener* listener = communicationListeners.get( service );
269 assert( listener != NULL );
270
271 // copy link id
272 LinkID linkid = link_id;
273
274 // create link id if necessary
275 if ( linkid.isUnspecified() )
276 linkid = LinkID::create();
277
278 // create relay link descriptor
279 NodeID relayNode = getRelayNode(remoteNode);
280
281 // add descriptor
282 LinkDescriptor* ld = addDescriptor( linkid );
283 ld->overlayId = linkid;
284 ld->service = service;
285 ld->listener = listener;
286 ld->remoteNode = remoteNode;
287
288 // set relay node if available
289 ld->relay = !relayNode.isUnspecified();
290 ld->localRelay = relayNode;
291
292 if (!ld->relay)
293 logging_error("No relay found!");
294
295 // debug output
296 logging_debug( "Created link descriptor: " << ld );
297
298 return ld;
299}
300
301
302// ----------------------------------------------------------------------------
303
304use_logging_cpp(BaseOverlay);
305
306// ----------------------------------------------------------------------------
307
308BaseOverlay::BaseOverlay() :
309 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
310 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
311 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
312}
313
314BaseOverlay::~BaseOverlay() {
315}
316
317// ----------------------------------------------------------------------------
318
319void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
320 logging_info("Starting...");
321
322 // set parameters
323 bc = &_basecomm;
324 nodeId = _nodeid;
325
326 // register at base communication
327 bc->registerMessageReceiver( this );
328 bc->registerEventListener( this );
329
330 // timer for auto link management
331 Timer::setInterval( 1000 );
332 Timer::start();
333
334 started = true;
335 state = BaseOverlayStateInvalid;
336}
337
338void BaseOverlay::stop() {
339 logging_info("Stopping...");
340
341 // stop timer
342 Timer::stop();
343
344 // delete oberlay interface
345 if(overlayInterface != NULL) {
346 delete overlayInterface;
347 overlayInterface = NULL;
348 }
349
350 // unregister at base communication
351 bc->unregisterMessageReceiver( this );
352 bc->unregisterEventListener( this );
353
354 started = false;
355 state = BaseOverlayStateInvalid;
356}
357
358bool BaseOverlay::isStarted(){
359 return started;
360}
361
362// ----------------------------------------------------------------------------
363
364void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
365 const EndpointDescriptor& bootstrapEp) {
366
367 if(id != spovnetId){
368 logging_error("attempt to join against invalid spovnet, call initiate first");
369 return;
370 }
371
372 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
373 logging_info( "Starting to join spovnet " << id.toString() <<
374 " with nodeid " << nodeId.toString());
375
376 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
377
378 // bootstrap against ourselfs
379 logging_info("joining spovnet locally");
380
381 overlayInterface->joinOverlay();
382 state = BaseOverlayStateCompleted;
383 BOOST_FOREACH( NodeListener* i, nodeListeners )
384 i->onJoinCompleted( spovnetId );
385
386 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
387 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
388
389 logging_debug("starting overlay bootstrap module");
390 overlayBootstrap.start(this, spovnetId, nodeId);
391 overlayBootstrap.publish(bc->getEndpointDescriptor());
392
393 } else {
394
395 // bootstrap against another node
396 logging_info("joining spovnet remotely against " << bootstrapEp.toString());
397
398 const LinkID& lnk = bc->establishLink( bootstrapEp );
399 bootstrapLinks.push_back(lnk);
400
401 logging_info("join process initiated for " << id.toString() << "...");
402
403 }
404}
405
406void BaseOverlay::leaveSpoVNet() {
407
408 logging_info( "Leaving spovnet " << spovnetId );
409 bool ret = ( state != this->BaseOverlayStateInvalid );
410
411 logging_debug("stopping overlay bootstrap module");
412 overlayBootstrap.stop();
413 overlayBootstrap.revoke();
414
415 logging_debug( "Dropping all auto-links" );
416
417 // gather all service links
418 vector<LinkID> servicelinks;
419 BOOST_FOREACH( LinkDescriptor* ld, links ) {
420 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
421 servicelinks.push_back( ld->overlayId );
422 }
423
424 // drop all service links
425 BOOST_FOREACH( LinkID lnk, servicelinks )
426 dropLink( lnk );
427
428 // let the node leave the spovnet overlay interface
429 logging_debug( "Leaving overlay" );
430 if( overlayInterface != NULL )
431 overlayInterface->leaveOverlay();
432
433 // drop still open bootstrap links
434 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
435 bc->dropLink( lnk );
436
437 // change to inalid state
438 state = BaseOverlayStateInvalid;
439 //ovl.visShutdown( ovlId, nodeId, string("") );
440
441 // inform all registered services of the event
442 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
443 if( ret ) i->onLeaveCompleted( spovnetId );
444 else i->onLeaveFailed( spovnetId );
445 }
446}
447
448void BaseOverlay::createSpoVNet(const SpoVNetID& id,
449 const OverlayParameterSet& param,
450 const SecurityParameterSet& sec,
451 const QoSParameterSet& qos) {
452
453 // set the state that we are an initiator, this way incoming messages are
454 // handled correctly
455 logging_info( "creating spovnet " + id.toString() <<
456 " with nodeid " << nodeId.toString() );
457
458 spovnetId = id;
459
460 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
461 if( overlayInterface == NULL ) {
462 logging_fatal( "overlay structure not supported" );
463 state = BaseOverlayStateInvalid;
464
465 BOOST_FOREACH( NodeListener* i, nodeListeners )
466 i->onJoinFailed( spovnetId );
467
468 return;
469 }
470}
471
472// ----------------------------------------------------------------------------
473
474const LinkID BaseOverlay::establishLink(
475 const EndpointDescriptor& ep, const NodeID& nodeid,
476 const ServiceID& service, const NodeID& remoteRelay, const LinkID& linkid ) {
477
478 LinkID link_id = linkid;
479
480 // establish link via overlay
481 if (!nodeid.isUnspecified())
482 link_id = establishLink( nodeid, service, remoteRelay, link_id );
483
484 // establish link directly if only ep is known
485 if (nodeid.isUnspecified())
486 establishDirectLink( ep, service, link_id );
487
488 return link_id;
489}
490
491/// call base communication's establish link and add link mapping
492const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
493 const ServiceID& service, const LinkID& linkid ) {
494
495 // create a new link id if necessary
496 LinkID link_id = linkid;
497 if (link_id.isUnspecified()) link_id = LinkID::create();
498
499 /// find a service listener
500 if( !communicationListeners.contains( service ) ) {
501 logging_error( "No listener registered for service id=" << service.toString() );
502 return LinkID::UNSPECIFIED;
503 }
504 CommunicationListener* listener = communicationListeners.get( service );
505 assert( listener != NULL );
506
507 /// establish link and add mapping
508 logging_info("Establishing direct link " << link_id.toString()
509 << " using " << ep.toString());
510
511 // create descriptor
512 LinkDescriptor* ld = addDescriptor( link_id );
513 ld->overlayId = link_id;
514 ld->communicationId = link_id;
515 ld->listener = listener;
516 ld->service = service;
517 bc->establishLink( ep, link_id );
518
519 return link_id;
520}
521
522/// establishes a link between two arbitrary nodes
523const LinkID BaseOverlay::establishLink( const NodeID& node,
524 const ServiceID& service, const NodeID& remoteRelay, const LinkID& link_id ) {
525
526 // do not establish a link to myself!
527 if (node == nodeId) return LinkID::UNSPECIFIED;
528
529 // create a link descriptor
530 LinkDescriptor* ld = createLinkDescriptor( node, service, link_id );
531 ld->remoteRelay = remoteRelay;
532
533 // create link request message with own link id
534 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
535 LinkRequest link_request_msg(
536 nonce, &bc->getEndpointDescriptor(), false,
537 ld->overlayId, ld->localRelay );
538 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
539 overlay_msg.encapsulate( &link_request_msg );
540 pendingLinks.insert( make_pair(nonce, ld->overlayId) );
541
542 // debug message
543 logging_debug(
544 "Sending link request with"
545 << " link id=" << ld->overlayId
546 << " node id=" << ld->remoteNode.toString()
547 << " service id=" << ld->service.toString()
548 << " local relay id=" << ld->localRelay.toString()
549 << " nonce= " << nonce
550 );
551
552 // sending message through new link
553 sendMessage( &overlay_msg, ld );
554
555 return ld->overlayId;
556}
557
558/// drops an established link
559void BaseOverlay::dropLink(const LinkID& link) {
560 logging_debug( "Dropping link (initiated locally):" << link.toString() );
561
562 // find the link item to drop
563 LinkDescriptor* ld = getDescriptor(link);
564 if( ld == NULL ) {
565 logging_warn( "Can't drop link, link is unknown!");
566 return;
567 }
568
569 // delete all queued messages
570 if( ld->messageQueue.size() > 0 ) {
571 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
572 << ld->messageQueue.size() << " waiting messages" );
573 ld->flushQueue();
574 }
575
576 // inform sideport and listener
577 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
578 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
579
580 // do not drop relay links
581 if (!ld->usedAsRelay) {
582 // drop the link in base communication
583 if (ld->communicationUp) bc->dropLink( ld->communicationId );
584
585 // erase descriptor
586 eraseDescriptor( ld->overlayId );
587 } else
588 ld->dropWhenRelaysLeft = true;
589}
590
591// ----------------------------------------------------------------------------
592
593/// internal send message, always use this functions to send messages over links
594seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
595 logging_debug( "Sending data message on link " << link.toString() );
596
597 // get the mapping for this link
598 LinkDescriptor* ld = getDescriptor(link);
599 if( ld == NULL ) {
600 logging_error("Could not send message. "
601 << "Link not found id=" << link.toString());
602 return -1;
603 }
604
605 // check if the link is up yet, if its an auto link queue message
606 if( !ld->up ) {
607 ld->markAsUsed();
608 if( ld->autolink ) {
609 logging_info("Auto-link " << link.toString() << " not up, queue message");
610 Data data = data_serialize( message );
611 const_cast<Message*>(message)->dropPayload();
612 ld->messageQueue.push_back( new Message(data) );
613 } else {
614 logging_error("Link " << link.toString() << " not up, drop message");
615 }
616 return -1;
617 }
618
619 // compile overlay message (has service and node id)
620 OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId );
621 overmsg.encapsulate( const_cast<Message*>(message) );
622
623 // send message over relay/direct/overlay
624 return sendMessage( &overmsg, ld );
625}
626
627seqnum_t BaseOverlay::sendMessage(const Message* message,
628 const NodeID& node, const ServiceID& service) {
629
630 // find link for node and service
631 LinkDescriptor* ld = getAutoDescriptor( node, service );
632
633 // if we found no link, create an auto link
634 if( ld == NULL ) {
635
636 // debug output
637 logging_info( "No link to send message to node "
638 << node.toString() << " found for service "
639 << service.toString() << ". Creating auto link ..."
640 );
641
642 // this will call onlinkup on us, if everything worked we now have a mapping
643 LinkID link = LinkID::create();
644
645 // call base overlay to create a link
646 link = establishLink( node, service, NodeID::UNSPECIFIED, link );
647 ld = getDescriptor( link );
648 if( ld == NULL ) {
649 logging_error( "Failed to establish auto-link.");
650 return -1;
651 }
652 ld->autolink = true;
653
654 logging_debug( "Auto-link establishment in progress to node "
655 << node.toString() << " with link id=" << link.toString() );
656 }
657 assert(ld != NULL);
658
659 // mark the link as used, as we now send a message through it
660 ld->markAsUsed();
661
662 // send / queue message
663 return sendMessage( message, ld->overlayId );
664}
665
666// ----------------------------------------------------------------------------
667
668const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
669 const LinkID& link) const {
670
671 // return own end-point descriptor
672 if( link == LinkID::UNSPECIFIED )
673 return bc->getEndpointDescriptor();
674
675 // find link descriptor. not found -> return unspecified
676 const LinkDescriptor* ld = getDescriptor(link);
677 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
678
679 // return endpoint-descriptor from base communication
680 return bc->getEndpointDescriptor( ld->communicationId );
681}
682
683const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
684 const NodeID& node) const {
685
686 // return own end-point descriptor
687 if( node == nodeId || node == NodeID::UNSPECIFIED )
688 return bc->getEndpointDescriptor();
689
690 // no joined and request remote descriptor? -> fail!
691 if( overlayInterface == NULL ) {
692 logging_error( "overlay interface not set, cannot resolve endpoint" );
693 return EndpointDescriptor::UNSPECIFIED();
694 }
695
696 // resolve end-point descriptor from the base-overlay routing table
697 return overlayInterface->resolveNode( node );
698}
699
700// ----------------------------------------------------------------------------
701
702bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
703 sideport = _sideport;
704 _sideport->configure( this );
705}
706
707bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
708 sideport = &SideportListener::DEFAULT;
709}
710
711// ----------------------------------------------------------------------------
712
713bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
714 logging_debug( "binding communication listener " << listener
715 << " on serviceid " << sid.toString() );
716
717 if( communicationListeners.contains( sid ) ) {
718 logging_error( "some listener already registered for service id "
719 << sid.toString() );
720 return false;
721 }
722
723 communicationListeners.registerItem( listener, sid );
724 return true;
725}
726
727
728bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
729 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
730
731 if( !communicationListeners.contains( sid ) ) {
732 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
733 return false;
734 }
735
736 if( communicationListeners.get(sid) != listener ) {
737 logging_warn( "listener bound to service id " << sid.toString()
738 << " is different than listener trying to unbind" );
739 return false;
740 }
741
742 communicationListeners.unregisterItem( sid );
743 return true;
744}
745
746// ----------------------------------------------------------------------------
747
748bool BaseOverlay::bind(NodeListener* listener) {
749 logging_debug( "Binding node listener " << listener );
750
751 // already bound? yes-> warning
752 NodeListenerVector::iterator i =
753 find( nodeListeners.begin(), nodeListeners.end(), listener );
754 if( i != nodeListeners.end() ) {
755 logging_warn("Node listener " << listener << " is already bound!" );
756 return false;
757 }
758
759 // no-> add
760 nodeListeners.push_back( listener );
761 return true;
762}
763
764bool BaseOverlay::unbind(NodeListener* listener) {
765 logging_debug( "Unbinding node listener " << listener );
766
767 // already unbound? yes-> warning
768 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
769 if( i == nodeListeners.end() ) {
770 logging_warn( "Node listener " << listener << " is not bound!" );
771 return false;
772 }
773
774 // no-> remove
775 nodeListeners.erase( i );
776 return true;
777}
778
779// ----------------------------------------------------------------------------
780
781void BaseOverlay::onLinkUp(const LinkID& id,
782 const address_v* local, const address_v* remote) {
783 logging_debug( "Link up with base communication link id=" << id );
784
785 // get descriptor for link
786 LinkDescriptor* ld = getDescriptor(id, true);
787
788 // handle bootstrap link we initiated
789 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
790 logging_info(
791 "Join has been initiated by me and the link is now up. " <<
792 "Sending out join request for SpoVNet " << spovnetId.toString()
793 );
794
795 // send join request message
796 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId );
797 JoinRequest joinRequest( spovnetId, nodeId );
798 overlayMsg.encapsulate( &joinRequest );
799 bc->sendMessage( id, &overlayMsg );
800 return;
801 }
802
803 // no link found? -> link establishment from remote, add one!
804 if (ld == NULL) {
805 ld = addDescriptor( id );
806 logging_debug( "onLinkUp (remote request) descriptor: " << ld );
807
808 // update descriptor
809 ld->fromRemote = true;
810 ld->communicationId = id;
811 ld->communicationUp = true;
812 ld->markAsUsed();
813
814 // in this case, do not inform listener, since service it unknown
815 // -> wait for update message!
816
817 // link mapping found? -> send update message with node-id and service id
818 } else {
819 logging_debug( "onLinkUp descriptor (initiated locally):" << ld );
820
821 // note: necessary to validate the link on the remote side!
822 logging_debug( "Sending out update" <<
823 " for service " << ld->service.toString() <<
824 " with local node id " << nodeId.toString() <<
825 " on link " << ld->overlayId.toString() );
826
827 // update descriptor
828 ld->markAsUsed();
829 ld->communicationUp = true;
830
831 // if link is a relayed link ->convert to direct link
832 if (ld->relay && !ld->remoteLinkId.isUnspecified() ) {
833 logging_info( "Converting to direct link: " << ld );
834 ld->up = true;
835 ld->relay = false;
836 ld->localRelay = NodeID::UNSPECIFIED;
837 OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId );
838 overMsg.setRelayLink( ld->remoteLinkId );
839 bc->sendMessage( ld->communicationId, &overMsg );
840 }
841
842 // compile and send update message
843 OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
844 overlayMsg.setAutoLink( ld->autolink );
845 bc->sendMessage( ld->communicationId, &overlayMsg );
846 }
847}
848
849void BaseOverlay::onLinkDown(const LinkID& id,
850 const address_v* local, const address_v* remote) {
851
852 // erase bootstrap links
853 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
854 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
855
856 // get descriptor for link
857 LinkDescriptor* ld = getDescriptor(id, true);
858 if ( ld == NULL ) return; // not found? ->ignore!
859 logging_info( "onLinkDown descriptor: " << ld );
860
861 // inform listeners about link down
862 ld->communicationUp = false;
863 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
864 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
865
866 // delete all queued messages (auto links)
867 if( ld->messageQueue.size() > 0 ) {
868 logging_warn( "Dropping link " << id.toString() << " that has "
869 << ld->messageQueue.size() << " waiting messages" );
870 ld->flushQueue();
871 }
872
873 // erase mapping
874 eraseDescriptor(ld->overlayId);
875}
876
877void BaseOverlay::onLinkChanged(const LinkID& id,
878 const address_v* oldlocal, const address_v* newlocal,
879 const address_v* oldremote, const address_v* newremote) {
880
881 // get descriptor for link
882 LinkDescriptor* ld = getDescriptor(id, true);
883 if ( ld == NULL ) return; // not found? ->ignore!
884 logging_debug( "onLinkChanged descriptor: " << ld );
885
886 // inform listeners
887 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
888 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
889
890 // autolinks: refresh timestamp
891 ld->markAsUsed();
892}
893
894void BaseOverlay::onLinkFail(const LinkID& id,
895 const address_v* local, const address_v* remote) {
896 logging_debug( "Link fail with base communication link id=" << id );
897
898 // erase bootstrap links
899 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
900 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
901
902 // get descriptor for link
903 LinkDescriptor* ld = getDescriptor(id, true);
904 if ( ld == NULL ) return; // not found? ->ignore!
905 logging_debug( "Link failed id=" << ld->overlayId.toString() );
906
907 // inform listeners
908 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
909 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
910
911 // autolinks: refresh timestamp
912 ld->markAsUsed();
913}
914
915void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
916 const address_v* remote, const QoSParameterSet& qos) {
917 logging_debug( "Link quality changed with base communication link id=" << id );
918
919 // get descriptor for link
920 LinkDescriptor* ld = getDescriptor(id, true);
921 if ( ld == NULL ) return; // not found? ->ignore!
922 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
923
924 // autolinks: refresh timestamp
925 ld->markAsUsed();
926}
927
928bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
929 const address_v* remote ) {
930 logging_debug("Accepting link request from " << remote->to_string() );
931 return true;
932}
933
934/// handles a message from base communication
935bool BaseOverlay::receiveMessage(const Message* message,
936 const LinkID& link, const NodeID& ) {
937 // get descriptor for link
938 LinkDescriptor* ld = getDescriptor( link, true );
939
940 // link known?
941 if (ld == NULL) { // no-> handle with unspecified params
942 logging_debug("Received message from base communication, link descriptor unknown" );
943 return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED );
944 } else { // yes -> handle with overlay link id
945 logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() );
946 return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED );
947 }
948}
949
950// ----------------------------------------------------------------------------
951
952/// handles a message from an overlay
953void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) {
954 logging_debug("Received message from overlay -- "
955 << " link id=" << link.toString()
956 << " node id=" << source.toString() );
957 handleMessage( msg, link, LinkID::UNSPECIFIED, source );
958}
959
960// ----------------------------------------------------------------------------
961
962/// handles an incoming message
963bool BaseOverlay::handleMessage( const Message* message,
964 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) {
965 logging_debug( "Handling message: " << message->toString());
966
967 bool ret = false;
968
969 // decapsulate overlay message
970 OverlayMsg* overlayMsg =
971 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
972 if( overlayMsg == NULL ) return false;
973
974 // mark the link as in action
975 LinkDescriptor* ld = getDescriptor(boLink);
976 if (ld == NULL) ld = getDescriptor(bcLink, true);
977 if (ld != NULL) {
978 ld->markAsUsed();
979 ld->markAlive();
980 }
981
982 switch ( overlayMsg->getType() ) {
983 // ---------------------------------------------------------------------
984 // Handle spovnet instance join requests
985 // ---------------------------------------------------------------------
986 case OverlayMsg::typeJoinRequest: {
987
988 // decapsulate message
989 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
990 logging_info( "Received join request for spovnet " <<
991 joinReq->getSpoVNetID().toString() );
992
993 // check spovnet id
994 if( joinReq->getSpoVNetID() != spovnetId ) {
995 logging_error(
996 "Received join request for spovnet we don't handle " <<
997 joinReq->getSpoVNetID().toString() );
998 ret = false;
999 break;
1000 }
1001
1002 // TODO: here you can implement mechanisms to deny joining of a node
1003 bool allow = true;
1004 logging_info( "Sending join reply for spovnet " <<
1005 spovnetId.toString() << " to node " <<
1006 overlayMsg->getSourceNode().toString() <<
1007 ". Result: " << (allow ? "allowed" : "denied") );
1008 joiningNodes.push_back( overlayMsg->getSourceNode() );
1009
1010 // return overlay parameters
1011 assert( overlayInterface != NULL );
1012 logging_debug( "Using bootstrap end-point "
1013 << getEndpointDescriptor().toString() )
1014 OverlayParameterSet parameters = overlayInterface->getParameters();
1015 OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId );
1016 JoinReply replyMsg( spovnetId, parameters,
1017 allow, getEndpointDescriptor() );
1018 retmsg.encapsulate(&replyMsg);
1019 bc->sendMessage( bcLink, &retmsg );
1020 ret = true;
1021 break;
1022 }
1023
1024 // ---------------------------------------------------------------------
1025 // handle replies to spovnet instance join requests
1026 // ---------------------------------------------------------------------
1027 case OverlayMsg::typeJoinReply: {
1028
1029 // decapsulate message
1030 logging_debug("received join reply message");
1031 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1032
1033 // correct spovnet?
1034 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1035 logging_error( "Received SpoVNet join reply for " <<
1036 replyMsg->getSpoVNetID().toString() <<
1037 " != " << spovnetId.toString() );
1038 ret = false;
1039 delete replyMsg;
1040 break;
1041 }
1042
1043 // access granted? no -> fail
1044 if( !replyMsg->getJoinAllowed() ) {
1045 logging_error( "Our join request has been denied" );
1046
1047 // drop initiator link
1048 if(bcLink != LinkID::UNSPECIFIED){
1049 bc->dropLink( bcLink );
1050
1051 vector<LinkID>::iterator it = std::find(
1052 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1053 if( it != bootstrapLinks.end() )
1054 bootstrapLinks.erase(it);
1055 }
1056
1057 // inform all registered services of the event
1058 BOOST_FOREACH( NodeListener* i, nodeListeners )
1059 i->onJoinFailed( spovnetId );
1060
1061 ret = true;
1062 delete replyMsg;
1063 break;
1064 }
1065
1066 // access has been granted -> continue!
1067 logging_info("Join request has been accepted for spovnet " <<
1068 spovnetId.toString() );
1069
1070 logging_debug( "Using bootstrap end-point "
1071 << replyMsg->getBootstrapEndpoint().toString() );
1072
1073 //
1074 // create overlay structure from spovnet parameter set
1075 // if we have not boostrapped yet against some other node
1076 //
1077
1078 if( overlayInterface == NULL ){
1079
1080 logging_debug("first-time bootstrapping");
1081
1082 overlayInterface = OverlayFactory::create(
1083 *this, replyMsg->getParam(), nodeId, this );
1084
1085 // overlay structure supported? no-> fail!
1086 if( overlayInterface == NULL ) {
1087 logging_error( "overlay structure not supported" );
1088
1089 if(bcLink != LinkID::UNSPECIFIED){
1090 bc->dropLink( bcLink );
1091
1092 vector<LinkID>::iterator it = std::find(
1093 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1094 if( it != bootstrapLinks.end() )
1095 bootstrapLinks.erase(it);
1096 }
1097
1098 // inform all registered services of the event
1099 BOOST_FOREACH( NodeListener* i, nodeListeners )
1100 i->onJoinFailed( spovnetId );
1101
1102 delete replyMsg;
1103 ret = true;
1104 break;
1105 }
1106
1107 // everything ok-> join the overlay!
1108 state = BaseOverlayStateCompleted;
1109 overlayInterface->createOverlay();
1110 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1111
1112 // update ovlvis
1113 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1114
1115 // inform all registered services of the event
1116 BOOST_FOREACH( NodeListener* i, nodeListeners )
1117 i->onJoinCompleted( spovnetId );
1118
1119 } else {
1120
1121 // this is not the first bootstrap, just join the additional node
1122 logging_debug("not first-time bootstrapping");
1123 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1124
1125 } // if( overlayInterface == NULL )
1126
1127 //record bootstrap ep as good endpoint to join
1128 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1129
1130 delete replyMsg;
1131 ret = true;
1132 break;
1133 }
1134
1135 // ---------------------------------------------------------------------
1136 // handle data forward messages
1137 // ---------------------------------------------------------------------
1138 case OverlayMsg::typeData: {
1139
1140 // get service
1141 const ServiceID& service = overlayMsg->getService();
1142 logging_debug( "received data for service " << service.toString() );
1143
1144 // find listener
1145 CommunicationListener* listener =
1146 communicationListeners.get( service );
1147 if( listener == NULL ) {
1148 ret = true;
1149 break;
1150 }
1151
1152 // delegate data message
1153 listener->onMessage( overlayMsg,
1154 overlayMsg->getSourceNode(), ld->overlayId );
1155
1156 ret = true;
1157 break;
1158 }
1159
1160 // ---------------------------------------------------------------------
1161 // handle update messages for link establishment
1162 // ---------------------------------------------------------------------
1163 case OverlayMsg::typeUpdate: {
1164 // get info
1165 const NodeID& sourcenode = overlayMsg->getSourceNode();
1166 const ServiceID& service = overlayMsg->getService();
1167
1168 // no link descriptor available -> error!
1169 if( ld == NULL ) {
1170 logging_warn( "received overlay update message for link for "
1171 << "which we have no mapping" );
1172 ret = false;
1173 break;
1174 }
1175 logging_debug("Received type update message on link " << ld );
1176
1177 // update our link mapping information for this link
1178 bool changed =
1179 ( ld->remoteNode != sourcenode ) || ( ld->service != service );
1180 ld->remoteNode = sourcenode;
1181 ld->service = service;
1182 ld->autolink = overlayMsg->isAutoLink();
1183
1184 // if our link information changed, we send out an update, too
1185 if( changed ) {
1186 OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
1187 overMsg.setAutoLink(ld->autolink);
1188 bc->sendMessage( ld->communicationId, &overMsg );
1189 }
1190
1191 // service registered? no-> error!
1192 if( !communicationListeners.contains( service ) ) {
1193 logging_warn( "Link up: event listener has not been registered" );
1194 ret = false;
1195 break;
1196 }
1197
1198 // default or no service registered?
1199 CommunicationListener* listener = communicationListeners.get( service );
1200 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1201 logging_warn("Link up: event listener is default or null!" );
1202 ret = true;
1203 break;
1204 }
1205
1206 // update descriptor
1207 ld->listener = listener;
1208 ld->markAsUsed();
1209 ld->markAlive();
1210
1211 // ask the service whether it wants to accept this link
1212 if( !listener->onLinkRequest(sourcenode) ) {
1213
1214 logging_debug("Link id=" << ld->overlayId.toString() <<
1215 " has been denied by service " << service.toString() << ", dropping link");
1216
1217 // prevent onLinkDown calls to the service
1218 ld->listener = &CommunicationListener::DEFAULT;
1219
1220 // drop the link
1221 dropLink( ld->overlayId );
1222 ret = true;
1223 break;
1224 }
1225
1226 // set link up
1227 ld->up = true;
1228 logging_debug(
1229 "Link " << ld->overlayId.toString()
1230 << " has been accepted by service " << service.toString()
1231 << " and is now up"
1232 );
1233
1234 // auto links: link has been accepted -> send queued messages
1235 if( ld->messageQueue.size() > 0 ) {
1236 logging_info( "sending out queued messages on link " <<
1237 ld->overlayId.toString() );
1238 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1239 sendMessage( msg, ld->overlayId );
1240 delete msg;
1241 }
1242 ld->messageQueue.clear();
1243 }
1244
1245 // call the notification functions
1246 listener->onLinkUp( ld->overlayId, sourcenode );
1247 sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId );
1248
1249 ret = true;
1250 break;
1251 }
1252
1253 // ---------------------------------------------------------------------
1254 // handle link request forwarded through the overlay
1255 // ---------------------------------------------------------------------
1256 case OverlayMsg::typeLinkRequest: {
1257
1258 logging_debug( "received link request on link" );
1259
1260 // decapsulate message
1261 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
1262 const ServiceID& service = overlayMsg->getService();
1263
1264 // is request reply?
1265 if ( linkReq->isReply() ) {
1266
1267 // find link
1268 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
1269 if ( i == pendingLinks.end() ) {
1270 logging_error( "Nonce not found in link request" );
1271 ret = true;
1272 break;
1273 }
1274
1275 // debug message
1276 logging_debug( "Link request reply received. Establishing link "
1277 << i->second << " to " << (linkReq->getEndpoint()->toString())
1278 << " for service " << service.toString()
1279 << " with nonce " << linkReq->getNonce()
1280 << " using relay " << linkReq->getRelay().toString()
1281 << " and remote link id=" << linkReq->getRemoteLinkId()
1282 );
1283
1284 // get descriptor
1285 LinkDescriptor* ldn = getDescriptor(i->second);
1286 if (ldn==NULL) {
1287 delete linkReq;
1288 ret = true;
1289 break;
1290 }
1291
1292 // check if link request reply has a relay node ...
1293 if (!linkReq->getRelay().isUnspecified()) { // yes->
1294 ldn->up = true;
1295 ldn->relay = true;
1296 if (ldn->localRelay.isUnspecified()) {
1297 logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn );
1298 showLinkState();
1299 }
1300 ldn->remoteRelay = linkReq->getRelay();
1301 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1302 ldn->remoteNode = overlayMsg->getSourceNode();
1303
1304 ldn->markAlive();
1305
1306 // compile and send update message
1307 OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId );
1308 _overlayMsg.setAutoLink(ldn->autolink);
1309 sendMessage( &_overlayMsg, ldn );
1310
1311 // auto links: link has been accepted -> send queued messages
1312 if( ldn->messageQueue.size() > 0 ) {
1313 logging_info( "Sending out queued messages on link " <<
1314 ldn->overlayId.toString() );
1315 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1316 sendMessage( msg, ldn->overlayId );
1317 delete msg;
1318 }
1319 ldn->messageQueue.clear();
1320 }
1321
1322 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1323
1324 // try to establish a direct link
1325 ldn->communicationId =
1326 bc->establishLink( *linkReq->getEndpoint(), i->second );
1327 }
1328
1329 // no relay node-> use overlay routing
1330 else {
1331 ldn->up = true;
1332
1333 // establish direct link
1334 ldn->communicationId =
1335 bc->establishLink( *linkReq->getEndpoint(), i->second );
1336 }
1337 } else {
1338 logging_debug( "Link request received from node id="
1339 << overlayMsg->getSourceNode() );
1340
1341 // create link descriptor
1342 LinkDescriptor* ldn =
1343 createLinkDescriptor(overlayMsg->getSourceNode(),
1344 overlayMsg->getService(), LinkID::UNSPECIFIED );
1345 assert(!ldn->overlayId.isUnspecified());
1346
1347 // create reply message
1348 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
1349 LinkRequest link_request_msg(
1350 linkReq->getNonce(),
1351 &bc->getEndpointDescriptor(),
1352 true, ldn->overlayId, ldn->localRelay
1353 );
1354 overlay_msg.encapsulate( &link_request_msg );
1355
1356 // debug message
1357 logging_debug( "Sending LinkRequest reply for link with nonce " <<
1358 linkReq->getNonce() );
1359
1360 // if this is a relay link-> update information & inform listeners
1361 if (!linkReq->getRelay().isUnspecified()) {
1362 // set flags
1363 ldn->up = true;
1364 ldn->relay = true;
1365 if (ldn->localRelay.isUnspecified()) {
1366 logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn );
1367 showLinkState();
1368 }
1369 ldn->remoteRelay = linkReq->getRelay();
1370 ldn->remoteNode = overlayMsg->getSourceNode();
1371 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1372 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1373 }
1374
1375 // route message back over overlay
1376 sendMessage( &overlay_msg, ldn );
1377 }
1378 delete linkReq;
1379 ret = true;
1380 break;
1381 }
1382
1383 // ---------------------------------------------------------------------
1384 // handle relay message to forward messages
1385 // ---------------------------------------------------------------------
1386 case OverlayMsg::typeRelay: {
1387
1388 logging_debug( "received relay request on link" );
1389
1390 // decapsulate message
1391 RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>();
1392
1393 // is relay message informative?
1394 switch (relayMsg->getType()) {
1395
1396 // handle relay notification
1397 case RelayMessage::typeInform: {
1398 logging_info("Received relay information message with"
1399 << " relay " << relayMsg->getRelayNode()
1400 << " destination " << relayMsg->getDestNode() );
1401
1402 // mark incoming link as relay
1403 if (ld!=NULL) ld->markAsRelay();
1404
1405 // am I the destination of this message? yes->
1406 if (relayMsg->getDestNode() == nodeId ) {
1407 // deliver relay message locally!
1408 logging_debug("Relay message reached destination. Handling the message.");
1409 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1410 ret = true;
1411 break;
1412 }
1413
1414 // create route message
1415 OverlayMsg _overMsg( *overlayMsg );
1416 RelayMessage _relayMsg( *relayMsg );
1417 _relayMsg.setType( RelayMessage::typeRoute );
1418 _overMsg.encapsulate( &_relayMsg );
1419
1420 // forward message
1421 if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) {
1422 logging_info("Routing relay message to " << relayMsg->getDestNode().toString() );
1423 sendOverlay( &_overMsg, relayMsg->getDestNode() );
1424 } else {
1425 logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() );
1426 sendOverlay( &_overMsg, relayMsg->getRelayNode() );
1427 }
1428 ret = true;
1429 break;
1430 }
1431
1432 // handle relay routing
1433 case RelayMessage::typeRoute: {
1434 logging_info("Received relay route message with"
1435 << " relay " << relayMsg->getRelayNode()
1436 << " destination " << relayMsg->getDestNode() );
1437
1438 // mark incoming link as relay
1439 if (ld!=NULL) ld->markAsRelay();
1440
1441 // am I the destination of this message? yes->
1442 if (relayMsg->getDestNode() == nodeId ) {
1443 // deliver relay message locally!
1444 logging_debug("Relay message reached destination. Handling the message.");
1445 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1446 ret = true;
1447 break;
1448 }
1449
1450 // am I the relay for this message? yes->
1451 if (relayMsg->getRelayNode() == nodeId ) {
1452 logging_debug("I'm the relay for this message. Sending to destination.");
1453 OverlayMsg _overMsg( *overlayMsg );
1454 RelayMessage _relayMsg( *relayMsg );
1455 _overMsg.encapsulate(&_relayMsg);
1456
1457 /// this must be handled by using relay link!
1458 sendOverlay(&_overMsg, relayMsg->getDestNode());
1459 ret = true;
1460 break;
1461 }
1462
1463 // error: I'm not a relay or destination!
1464 logging_error("This node is neither relay nor destination. Dropping Message!");
1465 ret = true;
1466 break;
1467 }
1468 default: {
1469 logging_error("RelayMessage Unknown!");
1470 ret = true;
1471 break;
1472 }
1473 }
1474 delete relayMsg;
1475 break;
1476 }
1477
1478 // ---------------------------------------------------------------------
1479 // handle keep-alive messages
1480 // ---------------------------------------------------------------------
1481 case OverlayMsg::typeKeepAlive: {
1482 logging_debug( "received keep-alive on link" );
1483 if ( ld != NULL ) {
1484 logging_info("Keep-Alive for "<< ld->overlayId);
1485 ld->markAlive();
1486 }
1487 break;
1488 }
1489
1490 // ---------------------------------------------------------------------
1491 // handle direct link replacement messages
1492 // ---------------------------------------------------------------------
1493 case OverlayMsg::typeDirectLink: {
1494
1495 logging_debug( "Received direct link replacement request" );
1496
1497 LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() );
1498 if (rld == NULL || ld == NULL) {
1499 logging_error("Direct link replacement: Link "
1500 << overlayMsg->getRelayLink() << "not found error." );
1501 break;
1502 }
1503 logging_info( "Received direct link convert notification for " << rld );
1504
1505 // set communcation link id and set it up
1506 rld->communicationId = ld->communicationId;
1507
1508 // this is neccessary since this link was a relay link before!
1509 rld->communicationUp = true;
1510
1511 // this is not a relay link anymore!
1512 rld->relay = false;
1513 rld->localRelay = NodeID::UNSPECIFIED;
1514 rld->remoteRelay = NodeID::UNSPECIFIED;
1515
1516 // mark used and alive!
1517 rld->markAsUsed();
1518 rld->markAlive();
1519
1520 // erase the original descriptor
1521 eraseDescriptor(ld->overlayId);
1522 break;
1523 }
1524
1525 // ---------------------------------------------------------------------
1526 // handle unknown message type
1527 // ---------------------------------------------------------------------
1528 default: {
1529 logging_error( "received message in invalid state! don't know " <<
1530 "what to do with this message of type " <<
1531 overlayMsg->getType() );
1532 ret = false;
1533 break;
1534 }
1535 } /* switch */
1536
1537 delete overlayMsg;
1538 return ret;
1539}
1540
1541// ----------------------------------------------------------------------------
1542
1543void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1544
1545 logging_debug( "broadcasting message to all known nodes " <<
1546 "in the overlay from service " + service.toString() );
1547
1548 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1549 OverlayInterface::NodeList::iterator i = nodes.begin();
1550 for(; i != nodes.end(); i++ ) {
1551 if( *i == nodeId) continue; // don't send to ourselfs
1552 sendMessage( message, *i, service );
1553 }
1554}
1555
1556vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1557
1558 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1559
1560 // the known nodes _can_ also include our node, so we remove ourself
1561 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1562 if( i != nodes.end() ) nodes.erase( i );
1563
1564 return nodes;
1565}
1566
1567const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1568 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1569 const LinkDescriptor* ld = getDescriptor(lid);
1570 if( ld == NULL ) return NodeID::UNSPECIFIED;
1571 else return ld->remoteNode;
1572}
1573
1574vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1575 vector<LinkID> linkvector;
1576 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1577 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1578 linkvector.push_back( ld->overlayId );
1579 }
1580 }
1581 return linkvector;
1582}
1583
1584
1585void BaseOverlay::onNodeJoin(const NodeID& node) {
1586 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1587 if( i == joiningNodes.end() ) return;
1588
1589 logging_info( "node has successfully joined baseoverlay and overlay structure "
1590 << node.toString() );
1591
1592 joiningNodes.erase( i );
1593}
1594
1595void BaseOverlay::eventFunction() {
1596
1597 // send keep-alive messages over established links
1598 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1599 if (!ld->up) continue;
1600 OverlayMsg overMsg( OverlayMsg::typeKeepAlive,
1601 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1602 sendMessage( &overMsg, ld );
1603 }
1604
1605 // iterate over all links and check for time boundaries
1606 vector<LinkDescriptor*> oldlinks;
1607 time_t now = time(NULL);
1608 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1609 // remote used as relay flag
1610 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)
1611 ld->usedAsRelay = false;
1612
1613 // keep alives and not up? yes-> link connection request is stale!
1614 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
1615
1616 // increase counter
1617 ld->keepAliveMissed++;
1618
1619 // missed more than four keep-alive messages (10 sec)? -> drop link
1620 if (ld->keepAliveMissed > 4) {
1621 logging_info( "Link connection request is stale, closing: " << ld );
1622 oldlinks.push_back( ld );
1623 continue;
1624 }
1625 }
1626
1627 if (!ld->up) continue;
1628
1629 // drop links that are dropped and not used as relay
1630 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink) {
1631 oldlinks.push_back( ld );
1632 continue;
1633 }
1634
1635 // auto-link time exceeded?
1636 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
1637 oldlinks.push_back( ld );
1638 continue;
1639 }
1640
1641 // keep alives missed? yes->
1642 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
1643
1644 // increase counter
1645 ld->keepAliveMissed++;
1646
1647 // missed more than four keep-alive messages (4 sec)? -> drop link
1648 if (ld->keepAliveMissed >= 8) {
1649 logging_info( "Link is stale, closing: " << ld );
1650 oldlinks.push_back( ld );
1651 continue;
1652 }
1653 }
1654 }
1655
1656 // drop links
1657 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
1658/*
1659 vector<LinkID>::iterator it = std::find(
1660 bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId);
1661
1662 if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){
1663 logging_info( "Not dropping initiator link: " << ld );
1664 continue;
1665 }
1666*/
1667 logging_info( "Link timed out. Dropping " << ld );
1668 dropLink( ld->overlayId );
1669 }
1670
1671 // show link state
1672 counter++;
1673 if (counter>=4) showLinkState();
1674 if (counter>=4 || counter<0) counter = 0;
1675}
1676
1677void BaseOverlay::showLinkState() {
1678 int i=0;
1679 logging_info("--- link state -------------------------------");
1680 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1681 logging_info("link " << i << ": " << ld);
1682 i++;
1683 }
1684 logging_info("----------------------------------------------");
1685}
1686
1687}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.