source: source/ariba/overlay/BaseOverlay.cpp@ 5751

Last change on this file since 5751 was 5751, checked in by mies, 15 years ago
File size: 51.5 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51#include "ariba/overlay/messages/OverlayMsg.h"
52#include "ariba/overlay/messages/JoinRequest.h"
53#include "ariba/overlay/messages/JoinReply.h"
54#include "ariba/overlay/messages/LinkRequest.h"
55#include "ariba/overlay/messages/RelayMessage.h"
56
57#include "ariba/utility/misc/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
63 BOOST_FOREACH( LinkDescriptor* lp, links )
64 if ((communication ? lp->communicationId : lp->overlayId) == link)
65 return lp;
66 return NULL;
67}
68
69const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
70 BOOST_FOREACH( const LinkDescriptor* lp, links )
71 if ((communication ? lp->communicationId : lp->overlayId) == link)
72 return lp;
73 return NULL;
74}
75
76/// erases a link descriptor
77void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
78 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
79 LinkDescriptor* ld = *i;
80 if ((communication ? ld->communicationId : ld->overlayId) == link) {
81 delete ld;
82 links.erase(i);
83 break;
84 }
85 }
86}
87
88/// adds a link descriptor
89LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
90 LinkDescriptor* desc = getDescriptor( link );
91 if ( desc == NULL ) {
92 desc = new LinkDescriptor();
93 desc->overlayId = link;
94 links.push_back(desc);
95 }
96 return desc;
97}
98
99/// returns a auto-link descriptor
100LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
101 BOOST_FOREACH( LinkDescriptor* lp, links )
102 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up)
103 return lp;
104 BOOST_FOREACH( LinkDescriptor* lp, links )
105 if (lp->autolink && lp->remoteNode == node && lp->service == service )
106 return lp;
107 return NULL;
108}
109
110/// returns a direct local link relay descriptor for the given remote node
111LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& id ) {
112
113 // get used next hop towards node
114 LinkDescriptor* rld = NULL;
115 NodeID relayNode = NodeID::UNSPECIFIED;
116 LinkID rlid = overlayInterface->getNextLinkId(id);
117 if ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {
118 // get descriptor of first hop
119 rld = getDescriptor(rlid);
120
121 // is first hop a relay path use local relay
122 if ( rld->relay ) relayNode = rld->localRelay;
123
124 // no-> a proper relay node has been found
125 else relayNode = rld->remoteNode;
126 }
127
128 // if first relay is unknown choose a arbitrary direct node as relay
129 if ( relayNode.isUnspecified() ) {
130 for (size_t i=0; i<links.size(); i++)
131 if (links[i]->up &&
132 links[i]->communicationUp &&
133 !links[i]->relay &&
134 links[i]->keepAliveMissed <= 1 &&
135 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
136 relayNode = links[i]->remoteNode;
137 break;
138 }
139 }
140
141 // no local relay found-> damn!
142 if (relayNode.isUnspecified()) return NULL;
143
144 // get descriptor
145 BOOST_FOREACH( LinkDescriptor* lp, links )
146 if (lp->remoteNode == relayNode &&
147 lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&
148 lp->relay == false &&
149 lp->up)
150 return lp;
151
152 return NULL;
153}
154
155/// returns the link descriptor that is actually used for sending a message over the overöay
156LinkDescriptor* BaseOverlay::getSendDescriptor( const NodeID& nodeid, bool follow ) {
157 for (size_t i=0; i<links.size(); i++)
158 if ( !links[i]->relay &&
159 links[i]->up &&
160 links[i]->communicationUp &&
161 links[i]->keepAliveMissed <= 1 &&
162 links[i]->remoteNode == nodeid &&
163 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
164 return links[i];
165 }
166 LinkDescriptor* ld = getDescriptor(overlayInterface->getNextLinkId(nodeid));
167 if (ld != NULL && ld->relay && follow)
168 return getSendDescriptor(ld->localRelay, false);
169 return NULL;
170}
171
172NodeID BaseOverlay::getRelayNode( const NodeID& remoteNode ) {
173 LinkDescriptor* rld = getRelayDescriptor(remoteNode);
174 return rld!=NULL ? rld->remoteNode : NodeID::UNSPECIFIED;
175}
176
177/// routes a message over the overlay or directly sends it when a link is open
178seqnum_t BaseOverlay::sendOverlay( Message* message, const NodeID& nodeid, const NodeID& remoteRelay ) {
179 /// send message directly to a neighbor
180 for (size_t i=0; i<links.size(); i++)
181 if ( !links[i]->relay &&
182 links[i]->up &&
183 links[i]->communicationUp &&
184 links[i]->keepAliveMissed <= 1 &&
185 links[i]->remoteNode == nodeid &&
186 links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
187
188 // mark as relay and send message
189 links[i]->markAsRelay();
190 return sendMessage( message, links[i] );
191 }
192
193 /// send relayed message over the overlay
194 if (!remoteRelay.isUnspecified()) {
195 // create a information relay message to inform the relay about
196 OverlayMsg overlay_msg(
197 OverlayMsg::typeRelay, OverlayInterface::OVERLAY_SERVICE_ID, nodeId);
198 RelayMessage relayMsg( RelayMessage::typeInform, remoteRelay, nodeid, LinkID::UNSPECIFIED );
199 relayMsg.encapsulate( message );
200 overlay_msg.encapsulate( &relayMsg );
201
202 // get local relay link
203 LinkDescriptor* rld = getRelayDescriptor(nodeid);
204
205 // local relay available? send to local relay!
206 if (rld!=NULL) {
207 rld->markAsRelay();
208 sendMessage(&overlay_msg, rld);
209 } else
210 overlayInterface->routeMessage(remoteRelay, &overlay_msg);
211
212 // finished
213 return 0;
214 }
215
216 // common case: send message over the overlay
217 overlayInterface->routeMessage(nodeid, message);
218 return 0;
219}
220
221/// forwards a message over relays/directly using link descriptor
222seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) {
223
224 // directly send message
225 if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) {
226 logging_debug("Send: Sending message via Base Communication");
227 return bc->sendMessage( ld->communicationId, message );
228 }
229
230 // relay message
231 else if ( ld->relay ) {
232
233 // sending a relayed message
234 logging_debug("Send: Relaying message to node "
235 << ld->remoteNode.toString()
236 << " using relay " << ld->localRelay
237 );
238
239 // create a information relay message to inform the relay about
240 OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId );
241 RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId );
242 relayMsg.encapsulate( message );
243 overlay_msg.encapsulate( &relayMsg );
244
245 // route message to relay node in order to inform it!
246 logging_debug("sendMessage: Sending message over relayed link with" << ld );
247 sendOverlay( &overlay_msg, ld->localRelay );
248 return 0;
249 }
250
251 // error
252 else {
253 logging_error( "Could not send message descriptor=" << ld );
254 return -1;
255 }
256 return -1;
257}
258
259/// creates a link descriptor, apply relay semantics if possible
260LinkDescriptor* BaseOverlay::createLinkDescriptor(
261 const NodeID remoteNode, const ServiceID service, const LinkID link_id ) {
262
263 // find listener
264 if( !communicationListeners.contains( service ) ) {
265 logging_error( "No listener found for service " << service.toString() );
266 return NULL;
267 }
268 CommunicationListener* listener = communicationListeners.get( service );
269 assert( listener != NULL );
270
271 // copy link id
272 LinkID linkid = link_id;
273
274 // create link id if necessary
275 if ( linkid.isUnspecified() )
276 linkid = LinkID::create();
277
278 // create relay link descriptor
279 NodeID relayNode = getRelayNode(remoteNode);
280
281 // add descriptor
282 LinkDescriptor* ld = addDescriptor( linkid );
283 ld->overlayId = linkid;
284 ld->service = service;
285 ld->listener = listener;
286 ld->remoteNode = remoteNode;
287
288 // set relay node if available
289 ld->relay = !relayNode.isUnspecified();
290 ld->localRelay = relayNode;
291
292 if (!ld->relay)
293 logging_error("No relay found!");
294
295 // debug output
296 logging_debug( "Created link descriptor: " << ld );
297
298 return ld;
299}
300
301
302// ----------------------------------------------------------------------------
303
304use_logging_cpp(BaseOverlay);
305
306// ----------------------------------------------------------------------------
307
308BaseOverlay::BaseOverlay() :
309 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
310 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
311 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
312}
313
314BaseOverlay::~BaseOverlay() {
315}
316
317// ----------------------------------------------------------------------------
318
319void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
320 logging_info("Starting...");
321
322 // set parameters
323 bc = &_basecomm;
324 nodeId = _nodeid;
325
326 // register at base communication
327 bc->registerMessageReceiver( this );
328 bc->registerEventListener( this );
329
330 // timer for auto link management
331 Timer::setInterval( 1000 );
332 Timer::start();
333
334 started = true;
335 state = BaseOverlayStateInvalid;
336}
337
338void BaseOverlay::stop() {
339 logging_info("Stopping...");
340
341 // stop timer
342 Timer::stop();
343
344 // delete oberlay interface
345 if(overlayInterface != NULL) {
346 delete overlayInterface;
347 overlayInterface = NULL;
348 }
349
350 // unregister at base communication
351 bc->unregisterMessageReceiver( this );
352 bc->unregisterEventListener( this );
353
354 started = false;
355 state = BaseOverlayStateInvalid;
356}
357
358bool BaseOverlay::isStarted(){
359 return started;
360}
361
362// ----------------------------------------------------------------------------
363
364void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
365 const EndpointDescriptor& bootstrapEp) {
366
367 if(id != spovnetId){
368 logging_error("attempt to join against invalid spovnet, call initiate first");
369 return;
370 }
371
372
373 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
374 logging_info( "Starting to join spovnet " << id.toString() <<
375 " with nodeid " << nodeId.toString());
376
377 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
378
379 // bootstrap against ourselfs
380 logging_debug("joining spovnet locally");
381
382 overlayInterface->joinOverlay();
383 state = BaseOverlayStateCompleted;
384 BOOST_FOREACH( NodeListener* i, nodeListeners )
385 i->onJoinCompleted( spovnetId );
386
387 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
388 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
389
390 logging_debug("starting overlay bootstrap module");
391 overlayBootstrap.start(this, spovnetId, nodeId);
392 overlayBootstrap.publish(bc->getEndpointDescriptor());
393
394 } else {
395
396 // bootstrap against another node
397 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
398
399 const LinkID& lnk = bc->establishLink( bootstrapEp );
400 bootstrapLinks.push_back(lnk);
401 logging_info("join process initiated for " << id.toString() << "...");
402 }
403}
404
405void BaseOverlay::leaveSpoVNet() {
406
407 logging_info( "Leaving spovnet " << spovnetId );
408 bool ret = ( state != this->BaseOverlayStateInvalid );
409
410 logging_debug("stopping overlay bootstrap module");
411 overlayBootstrap.stop();
412 overlayBootstrap.revoke();
413
414 logging_debug( "Dropping all auto-links" );
415
416 // gather all service links
417 vector<LinkID> servicelinks;
418 BOOST_FOREACH( LinkDescriptor* ld, links ) {
419 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
420 servicelinks.push_back( ld->overlayId );
421 }
422
423 // drop all service links
424 BOOST_FOREACH( LinkID lnk, servicelinks )
425 dropLink( lnk );
426
427 // let the node leave the spovnet overlay interface
428 logging_debug( "Leaving overlay" );
429 if( overlayInterface != NULL )
430 overlayInterface->leaveOverlay();
431
432 // drop still open bootstrap links
433 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
434 bc->dropLink( lnk );
435
436 // change to inalid state
437 state = BaseOverlayStateInvalid;
438 //ovl.visShutdown( ovlId, nodeId, string("") );
439
440 // inform all registered services of the event
441 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
442 if( ret ) i->onLeaveCompleted( spovnetId );
443 else i->onLeaveFailed( spovnetId );
444 }
445}
446
447void BaseOverlay::createSpoVNet(const SpoVNetID& id,
448 const OverlayParameterSet& param,
449 const SecurityParameterSet& sec,
450 const QoSParameterSet& qos) {
451
452 // set the state that we are an initiator, this way incoming messages are
453 // handled correctly
454 logging_info( "creating spovnet " + id.toString() <<
455 " with nodeid " << nodeId.toString() );
456
457 spovnetId = id;
458
459 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
460 if( overlayInterface == NULL ) {
461 logging_fatal( "overlay structure not supported" );
462 state = BaseOverlayStateInvalid;
463
464 BOOST_FOREACH( NodeListener* i, nodeListeners )
465 i->onJoinFailed( spovnetId );
466
467 return;
468 }
469}
470
471// ----------------------------------------------------------------------------
472
473const LinkID BaseOverlay::establishLink(
474 const EndpointDescriptor& ep, const NodeID& nodeid,
475 const ServiceID& service, const NodeID& remoteRelay, const LinkID& linkid ) {
476
477 LinkID link_id = linkid;
478
479 // establish link via overlay
480 if (!nodeid.isUnspecified())
481 link_id = establishLink( nodeid, service, remoteRelay, link_id );
482
483 // establish link directly if only ep is known
484 if (nodeid.isUnspecified())
485 establishDirectLink( ep, service, link_id );
486
487 return link_id;
488}
489
490/// call base communication's establish link and add link mapping
491const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
492 const ServiceID& service, const LinkID& linkid ) {
493
494 // create a new link id if necessary
495 LinkID link_id = linkid;
496 if (link_id.isUnspecified()) link_id = LinkID::create();
497
498 /// find a service listener
499 if( !communicationListeners.contains( service ) ) {
500 logging_error( "No listener registered for service id=" << service.toString() );
501 return LinkID::UNSPECIFIED;
502 }
503 CommunicationListener* listener = communicationListeners.get( service );
504 assert( listener != NULL );
505
506 /// establish link and add mapping
507 logging_info("Establishing direct link " << link_id.toString()
508 << " using " << ep.toString());
509
510 // create descriptor
511 LinkDescriptor* ld = addDescriptor( link_id );
512 ld->overlayId = link_id;
513 ld->communicationId = link_id;
514 ld->listener = listener;
515 ld->service = service;
516 bc->establishLink( ep, link_id );
517
518 return link_id;
519}
520
521/// establishes a link between two arbitrary nodes
522const LinkID BaseOverlay::establishLink( const NodeID& node,
523 const ServiceID& service, const NodeID& remoteRelay, const LinkID& link_id ) {
524
525 // do not establish a link to myself!
526 if (node == nodeId) return LinkID::UNSPECIFIED;
527
528 // create a link descriptor
529 LinkDescriptor* ld = createLinkDescriptor( node, service, link_id );
530 ld->remoteRelay = remoteRelay;
531
532 // create link request message with own link id
533 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
534 LinkRequest link_request_msg(
535 nonce, &bc->getEndpointDescriptor(), false,
536 ld->overlayId, ld->localRelay );
537 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
538 overlay_msg.encapsulate( &link_request_msg );
539 pendingLinks.insert( make_pair(nonce, ld->overlayId) );
540
541 // debug message
542 logging_debug(
543 "Sending link request with"
544 << " link id=" << ld->overlayId
545 << " node id=" << ld->remoteNode.toString()
546 << " service id=" << ld->service.toString()
547 << " local relay id=" << ld->localRelay.toString()
548 << " nonce= " << nonce
549 );
550
551 // sending message through new link
552 sendMessage( &overlay_msg, ld );
553
554 return ld->overlayId;
555}
556
557/// drops an established link
558void BaseOverlay::dropLink(const LinkID& link) {
559 logging_debug( "Dropping link (initiated locally):" << link.toString() );
560
561 // find the link item to drop
562 LinkDescriptor* ld = getDescriptor(link);
563 if( ld == NULL ) {
564 logging_warn( "Can't drop link, link is unknown!");
565 return;
566 }
567
568 // delete all queued messages
569 if( ld->messageQueue.size() > 0 ) {
570 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
571 << ld->messageQueue.size() << " waiting messages" );
572 ld->flushQueue();
573 }
574
575 // inform sideport and listener
576 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
577 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
578
579 // do not drop relay links
580 if (!ld->usedAsRelay) {
581 // drop the link in base communication
582 if (ld->communicationUp) bc->dropLink( ld->communicationId );
583
584 // erase descriptor
585 eraseDescriptor( ld->overlayId );
586 } else
587 ld->dropWhenRelaysLeft = true;
588}
589
590// ----------------------------------------------------------------------------
591
592/// internal send message, always use this functions to send messages over links
593seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
594 logging_debug( "Sending data message on link " << link.toString() );
595
596 // get the mapping for this link
597 LinkDescriptor* ld = getDescriptor(link);
598 if( ld == NULL ) {
599 logging_error("Could not send message. "
600 << "Link not found id=" << link.toString());
601 return -1;
602 }
603
604 // check if the link is up yet, if its an auto link queue message
605 if( !ld->up ) {
606 ld->markAsUsed();
607 if( ld->autolink ) {
608 logging_info("Auto-link " << link.toString() << " not up, queue message");
609 Data data = data_serialize( message );
610 const_cast<Message*>(message)->dropPayload();
611 ld->messageQueue.push_back( new Message(data) );
612 } else {
613 logging_error("Link " << link.toString() << " not up, drop message");
614 }
615 return -1;
616 }
617
618 // compile overlay message (has service and node id)
619 OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId );
620 overmsg.encapsulate( const_cast<Message*>(message) );
621
622 // send message over relay/direct/overlay
623 return sendMessage( &overmsg, ld );
624}
625
626seqnum_t BaseOverlay::sendMessage(const Message* message,
627 const NodeID& node, const ServiceID& service) {
628
629 // find link for node and service
630 LinkDescriptor* ld = getAutoDescriptor( node, service );
631
632 // if we found no link, create an auto link
633 if( ld == NULL ) {
634
635 // debug output
636 logging_info( "No link to send message to node "
637 << node.toString() << " found for service "
638 << service.toString() << ". Creating auto link ..."
639 );
640
641 // this will call onlinkup on us, if everything worked we now have a mapping
642 LinkID link = LinkID::create();
643
644 // call base overlay to create a link
645 link = establishLink( node, service, NodeID::UNSPECIFIED, link );
646 ld = getDescriptor( link );
647 if( ld == NULL ) {
648 logging_error( "Failed to establish auto-link.");
649 return -1;
650 }
651 ld->autolink = true;
652
653 logging_debug( "Auto-link establishment in progress to node "
654 << node.toString() << " with link id=" << link.toString() );
655 }
656 assert(ld != NULL);
657
658 // mark the link as used, as we now send a message through it
659 ld->markAsUsed();
660
661 // send / queue message
662 return sendMessage( message, ld->overlayId );
663}
664
665// ----------------------------------------------------------------------------
666
667const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
668 const LinkID& link) const {
669
670 // return own end-point descriptor
671 if( link == LinkID::UNSPECIFIED )
672 return bc->getEndpointDescriptor();
673
674 // find link descriptor. not found -> return unspecified
675 const LinkDescriptor* ld = getDescriptor(link);
676 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
677
678 // return endpoint-descriptor from base communication
679 return bc->getEndpointDescriptor( ld->communicationId );
680}
681
682const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
683 const NodeID& node) const {
684
685 // return own end-point descriptor
686 if( node == nodeId || node == NodeID::UNSPECIFIED )
687 return bc->getEndpointDescriptor();
688
689 // no joined and request remote descriptor? -> fail!
690 if( overlayInterface == NULL ) {
691 logging_error( "overlay interface not set, cannot resolve endpoint" );
692 return EndpointDescriptor::UNSPECIFIED();
693 }
694
695 // resolve end-point descriptor from the base-overlay routing table
696 return overlayInterface->resolveNode( node );
697}
698
699// ----------------------------------------------------------------------------
700
701bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
702 sideport = _sideport;
703 _sideport->configure( this );
704}
705
706bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
707 sideport = &SideportListener::DEFAULT;
708}
709
710// ----------------------------------------------------------------------------
711
712bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
713 logging_debug( "binding communication listener " << listener
714 << " on serviceid " << sid.toString() );
715
716 if( communicationListeners.contains( sid ) ) {
717 logging_error( "some listener already registered for service id "
718 << sid.toString() );
719 return false;
720 }
721
722 communicationListeners.registerItem( listener, sid );
723 return true;
724}
725
726
727bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
728 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
729
730 if( !communicationListeners.contains( sid ) ) {
731 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
732 return false;
733 }
734
735 if( communicationListeners.get(sid) != listener ) {
736 logging_warn( "listener bound to service id " << sid.toString()
737 << " is different than listener trying to unbind" );
738 return false;
739 }
740
741 communicationListeners.unregisterItem( sid );
742 return true;
743}
744
745// ----------------------------------------------------------------------------
746
747bool BaseOverlay::bind(NodeListener* listener) {
748 logging_debug( "Binding node listener " << listener );
749
750 // already bound? yes-> warning
751 NodeListenerVector::iterator i =
752 find( nodeListeners.begin(), nodeListeners.end(), listener );
753 if( i != nodeListeners.end() ) {
754 logging_warn("Node listener " << listener << " is already bound!" );
755 return false;
756 }
757
758 // no-> add
759 nodeListeners.push_back( listener );
760 return true;
761}
762
763bool BaseOverlay::unbind(NodeListener* listener) {
764 logging_debug( "Unbinding node listener " << listener );
765
766 // already unbound? yes-> warning
767 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
768 if( i == nodeListeners.end() ) {
769 logging_warn( "Node listener " << listener << " is not bound!" );
770 return false;
771 }
772
773 // no-> remove
774 nodeListeners.erase( i );
775 return true;
776}
777
778// ----------------------------------------------------------------------------
779
780void BaseOverlay::onLinkUp(const LinkID& id,
781 const address_v* local, const address_v* remote) {
782 logging_debug( "Link up with base communication link id=" << id );
783
784 // get descriptor for link
785 LinkDescriptor* ld = getDescriptor(id, true);
786
787 // handle bootstrap link we initiated
788 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
789 logging_info(
790 "Join has been initiated by me and the link is now up. " <<
791 "Sending out join request for SpoVNet " << spovnetId.toString()
792 );
793
794 // send join request message
795 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId );
796 JoinRequest joinRequest( spovnetId, nodeId );
797 overlayMsg.encapsulate( &joinRequest );
798 bc->sendMessage( id, &overlayMsg );
799 return;
800 }
801
802 // no link found? -> link establishment from remote, add one!
803 if (ld == NULL) {
804 ld = addDescriptor( id );
805 logging_debug( "onLinkUp (remote request) descriptor: " << ld );
806
807 // update descriptor
808 ld->fromRemote = true;
809 ld->communicationId = id;
810 ld->communicationUp = true;
811 ld->markAsUsed();
812
813 // in this case, do not inform listener, since service it unknown
814 // -> wait for update message!
815
816 // link mapping found? -> send update message with node-id and service id
817 } else {
818 logging_debug( "onLinkUp descriptor (initiated locally):" << ld );
819
820 // note: necessary to validate the link on the remote side!
821 logging_debug( "Sending out update" <<
822 " for service " << ld->service.toString() <<
823 " with local node id " << nodeId.toString() <<
824 " on link " << ld->overlayId.toString() );
825
826 // update descriptor
827 ld->markAsUsed();
828 ld->communicationUp = true;
829
830 // if link is a relayed link ->convert to direct link
831 if (ld->relay && !ld->remoteLinkId.isUnspecified() ) {
832 logging_info( "Converting to direct link: " << ld );
833 ld->up = true;
834 ld->relay = false;
835 ld->localRelay = NodeID::UNSPECIFIED;
836 OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId );
837 overMsg.setRelayLink( ld->remoteLinkId );
838 bc->sendMessage( ld->communicationId, &overMsg );
839 }
840
841 // compile and send update message
842 OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
843 overlayMsg.setAutoLink( ld->autolink );
844 bc->sendMessage( ld->communicationId, &overlayMsg );
845 }
846}
847
848void BaseOverlay::onLinkDown(const LinkID& id,
849 const address_v* local, const address_v* remote) {
850
851 // erase bootstrap links
852 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
853 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
854
855 // get descriptor for link
856 LinkDescriptor* ld = getDescriptor(id, true);
857 if ( ld == NULL ) return; // not found? ->ignore!
858 logging_info( "onLinkDown descriptor: " << ld );
859
860 // inform listeners about link down
861 ld->communicationUp = false;
862 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
863 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
864
865 // delete all queued messages (auto links)
866 if( ld->messageQueue.size() > 0 ) {
867 logging_warn( "Dropping link " << id.toString() << " that has "
868 << ld->messageQueue.size() << " waiting messages" );
869 ld->flushQueue();
870 }
871
872 // erase mapping
873 eraseDescriptor(ld->overlayId);
874}
875
876void BaseOverlay::onLinkChanged(const LinkID& id,
877 const address_v* oldlocal, const address_v* newlocal,
878 const address_v* oldremote, const address_v* newremote) {
879
880 // get descriptor for link
881 LinkDescriptor* ld = getDescriptor(id, true);
882 if ( ld == NULL ) return; // not found? ->ignore!
883 logging_debug( "onLinkChanged descriptor: " << ld );
884
885 // inform listeners
886 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
887 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
888
889 // autolinks: refresh timestamp
890 ld->markAsUsed();
891}
892
893void BaseOverlay::onLinkFail(const LinkID& id,
894 const address_v* local, const address_v* remote) {
895 logging_debug( "Link fail with base communication link id=" << id );
896
897 // erase bootstrap links
898 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
899 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
900
901 // get descriptor for link
902 LinkDescriptor* ld = getDescriptor(id, true);
903 if ( ld == NULL ) return; // not found? ->ignore!
904 logging_debug( "Link failed id=" << ld->overlayId.toString() );
905
906 // inform listeners
907 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
908 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
909
910 // autolinks: refresh timestamp
911 ld->markAsUsed();
912}
913
914void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
915 const address_v* remote, const QoSParameterSet& qos) {
916 logging_debug( "Link quality changed with base communication link id=" << id );
917
918 // get descriptor for link
919 LinkDescriptor* ld = getDescriptor(id, true);
920 if ( ld == NULL ) return; // not found? ->ignore!
921 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
922
923 // autolinks: refresh timestamp
924 ld->markAsUsed();
925}
926
927bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
928 const address_v* remote ) {
929 logging_debug("Accepting link request from " << remote->to_string() );
930 return true;
931}
932
933/// handles a message from base communication
934bool BaseOverlay::receiveMessage(const Message* message,
935 const LinkID& link, const NodeID& ) {
936 // get descriptor for link
937 LinkDescriptor* ld = getDescriptor( link, true );
938
939 // link known?
940 if (ld == NULL) { // no-> handle with unspecified params
941 logging_debug("Received message from base communication, link descriptor unknown" );
942 return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED );
943 } else { // yes -> handle with overlay link id
944 logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() );
945 return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED );
946 }
947}
948
949// ----------------------------------------------------------------------------
950
951/// handles a message from an overlay
952void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) {
953 logging_debug("Received message from overlay -- "
954 << " link id=" << link.toString()
955 << " node id=" << source.toString() );
956 handleMessage( msg, link, LinkID::UNSPECIFIED, source );
957}
958
959// ----------------------------------------------------------------------------
960
961/// handles an incoming message
962bool BaseOverlay::handleMessage( const Message* message,
963 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) {
964 logging_debug( "Handling message: " << message->toString());
965
966 bool ret = false;
967
968 // decapsulate overlay message
969 OverlayMsg* overlayMsg =
970 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
971 if( overlayMsg == NULL ) return false;
972
973 // mark the link as in action
974 LinkDescriptor* ld = getDescriptor(boLink);
975 if (ld == NULL) ld = getDescriptor(bcLink, true);
976 if (ld != NULL) {
977 ld->markAsUsed();
978 ld->markAlive();
979 }
980
981 switch ( overlayMsg->getType() ) {
982 // ---------------------------------------------------------------------
983 // Handle spovnet instance join requests
984 // ---------------------------------------------------------------------
985 case OverlayMsg::typeJoinRequest: {
986
987 // decapsulate message
988 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
989 logging_info( "Received join request for spovnet " <<
990 joinReq->getSpoVNetID().toString() );
991
992 // check spovnet id
993 if( joinReq->getSpoVNetID() != spovnetId ) {
994 logging_error(
995 "Received join request for spovnet we don't handle " <<
996 joinReq->getSpoVNetID().toString() );
997 ret = false;
998 break;
999 }
1000
1001 // TODO: here you can implement mechanisms to deny joining of a node
1002 bool allow = true;
1003 logging_info( "Sending join reply for spovnet " <<
1004 spovnetId.toString() << " to node " <<
1005 overlayMsg->getSourceNode().toString() <<
1006 ". Result: " << (allow ? "allowed" : "denied") );
1007 joiningNodes.push_back( overlayMsg->getSourceNode() );
1008
1009 // return overlay parameters
1010 assert( overlayInterface != NULL );
1011 logging_debug( "Using bootstrap end-point "
1012 << getEndpointDescriptor().toString() )
1013 OverlayParameterSet parameters = overlayInterface->getParameters();
1014 OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId );
1015 JoinReply replyMsg( spovnetId, parameters,
1016 allow, getEndpointDescriptor() );
1017 retmsg.encapsulate(&replyMsg);
1018 bc->sendMessage( bcLink, &retmsg );
1019 ret = true;
1020 break;
1021 }
1022
1023 // ---------------------------------------------------------------------
1024 // handle replies to spovnet instance join requests
1025 // ---------------------------------------------------------------------
1026 case OverlayMsg::typeJoinReply: {
1027
1028 // decapsulate message
1029 logging_debug("received join reply message");
1030 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1031
1032 // correct spovnet?
1033 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1034 logging_error( "Received SpoVNet join reply for " <<
1035 replyMsg->getSpoVNetID().toString() <<
1036 " != " << spovnetId.toString() );
1037 ret = false;
1038 delete replyMsg;
1039 break;
1040 }
1041
1042 // access granted? no -> fail
1043 if( !replyMsg->getJoinAllowed() ) {
1044 logging_error( "Our join request has been denied" );
1045
1046 // drop initiator link
1047 if(bcLink != LinkID::UNSPECIFIED){
1048 bc->dropLink( bcLink );
1049
1050 vector<LinkID>::iterator it = std::find(
1051 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1052 if( it != bootstrapLinks.end() )
1053 bootstrapLinks.erase(it);
1054 }
1055
1056 // inform all registered services of the event
1057 BOOST_FOREACH( NodeListener* i, nodeListeners )
1058 i->onJoinFailed( spovnetId );
1059
1060 ret = true;
1061 delete replyMsg;
1062 break;
1063 }
1064
1065 // access has been granted -> continue!
1066 logging_info("Join request has been accepted for spovnet " <<
1067 spovnetId.toString() );
1068
1069 logging_debug( "Using bootstrap end-point "
1070 << replyMsg->getBootstrapEndpoint().toString() );
1071
1072 //
1073 // create overlay structure from spovnet parameter set
1074 // if we have not boostrapped yet against some other node
1075 //
1076
1077 if( overlayInterface == NULL ){
1078
1079 logging_debug("first-time bootstrapping");
1080
1081 overlayInterface = OverlayFactory::create(
1082 *this, replyMsg->getParam(), nodeId, this );
1083
1084 // overlay structure supported? no-> fail!
1085 if( overlayInterface == NULL ) {
1086 logging_error( "overlay structure not supported" );
1087
1088 if(bcLink != LinkID::UNSPECIFIED){
1089 bc->dropLink( bcLink );
1090
1091 vector<LinkID>::iterator it = std::find(
1092 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1093 if( it != bootstrapLinks.end() )
1094 bootstrapLinks.erase(it);
1095 }
1096
1097 // inform all registered services of the event
1098 BOOST_FOREACH( NodeListener* i, nodeListeners )
1099 i->onJoinFailed( spovnetId );
1100
1101 delete replyMsg;
1102 ret = true;
1103 break;
1104 }
1105
1106 // everything ok-> join the overlay!
1107 state = BaseOverlayStateCompleted;
1108 overlayInterface->createOverlay();
1109
1110 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1111
1112 // update ovlvis
1113 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1114
1115 // inform all registered services of the event
1116 BOOST_FOREACH( NodeListener* i, nodeListeners )
1117 i->onJoinCompleted( spovnetId );
1118
1119 delete replyMsg;
1120
1121 } else {
1122
1123 // this is not the first bootstrap, just join the additional node
1124 logging_debug("not first-time bootstrapping");
1125 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1126
1127 delete replyMsg;
1128
1129 } // if( overlayInterface == NULL )
1130
1131 ret = true;
1132 break;
1133 }
1134
1135 // ---------------------------------------------------------------------
1136 // handle data forward messages
1137 // ---------------------------------------------------------------------
1138 case OverlayMsg::typeData: {
1139
1140 // get service
1141 const ServiceID& service = overlayMsg->getService();
1142 logging_debug( "received data for service " << service.toString() );
1143
1144 // find listener
1145 CommunicationListener* listener =
1146 communicationListeners.get( service );
1147 if( listener == NULL ) {
1148 ret = true;
1149 break;
1150 }
1151
1152 // delegate data message
1153 listener->onMessage( overlayMsg,
1154 overlayMsg->getSourceNode(), ld->overlayId );
1155
1156 ret = true;
1157 break;
1158 }
1159
1160 // ---------------------------------------------------------------------
1161 // handle update messages for link establishment
1162 // ---------------------------------------------------------------------
1163 case OverlayMsg::typeUpdate: {
1164 // get info
1165 const NodeID& sourcenode = overlayMsg->getSourceNode();
1166 const ServiceID& service = overlayMsg->getService();
1167
1168 // no link descriptor available -> error!
1169 if( ld == NULL ) {
1170 logging_warn( "received overlay update message for link for "
1171 << "which we have no mapping" );
1172 ret = false;
1173 break;
1174 }
1175 logging_debug("Received type update message on link " << ld );
1176
1177 // update our link mapping information for this link
1178 bool changed =
1179 ( ld->remoteNode != sourcenode ) || ( ld->service != service );
1180 ld->remoteNode = sourcenode;
1181 ld->service = service;
1182 ld->autolink = overlayMsg->isAutoLink();
1183
1184 // if our link information changed, we send out an update, too
1185 if( changed ) {
1186 OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
1187 overMsg.setAutoLink(ld->autolink);
1188 bc->sendMessage( ld->communicationId, &overMsg );
1189 }
1190
1191 // service registered? no-> error!
1192 if( !communicationListeners.contains( service ) ) {
1193 logging_warn( "Link up: event listener has not been registered" );
1194 ret = false;
1195 break;
1196 }
1197
1198 // default or no service registered?
1199 CommunicationListener* listener = communicationListeners.get( service );
1200 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1201 logging_warn("Link up: event listener is default or null!" );
1202 ret = true;
1203 break;
1204 }
1205
1206 // update descriptor
1207 ld->listener = listener;
1208 ld->markAsUsed();
1209 ld->markAlive();
1210
1211 // ask the service whether it wants to accept this link
1212 if( !listener->onLinkRequest(sourcenode) ) {
1213
1214 logging_debug("Link id=" << ld->overlayId.toString() <<
1215 " has been denied by service " << service.toString() << ", dropping link");
1216
1217 // prevent onLinkDown calls to the service
1218 ld->listener = &CommunicationListener::DEFAULT;
1219
1220 // drop the link
1221 dropLink( ld->overlayId );
1222 ret = true;
1223 break;
1224 }
1225
1226 // set link up
1227 ld->up = true;
1228 logging_debug(
1229 "Link " << ld->overlayId.toString()
1230 << " has been accepted by service " << service.toString()
1231 << " and is now up"
1232 );
1233
1234 // auto links: link has been accepted -> send queued messages
1235 if( ld->messageQueue.size() > 0 ) {
1236 logging_info( "sending out queued messages on link " <<
1237 ld->overlayId.toString() );
1238 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1239 sendMessage( msg, ld->overlayId );
1240 delete msg;
1241 }
1242 ld->messageQueue.clear();
1243 }
1244
1245 // call the notification functions
1246 listener->onLinkUp( ld->overlayId, sourcenode );
1247 sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId );
1248
1249 ret = true;
1250 break;
1251 }
1252
1253 // ---------------------------------------------------------------------
1254 // handle link request forwarded through the overlay
1255 // ---------------------------------------------------------------------
1256 case OverlayMsg::typeLinkRequest: {
1257
1258 logging_debug( "received link request on link" );
1259
1260 // decapsulate message
1261 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
1262 const ServiceID& service = overlayMsg->getService();
1263
1264 // is request reply?
1265 if ( linkReq->isReply() ) {
1266
1267 // find link
1268 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
1269 if ( i == pendingLinks.end() ) {
1270 logging_error( "Nonce not found in link request" );
1271 ret = true;
1272 break;
1273 }
1274
1275 // debug message
1276 logging_debug( "Link request reply received. Establishing link "
1277 << i->second << " to " << (linkReq->getEndpoint()->toString())
1278 << " for service " << service.toString()
1279 << " with nonce " << linkReq->getNonce()
1280 << " using relay " << linkReq->getRelay().toString()
1281 << " and remote link id=" << linkReq->getRemoteLinkId()
1282 );
1283
1284 // get descriptor
1285 LinkDescriptor* ldn = getDescriptor(i->second);
1286 if (ldn==NULL) {
1287 delete linkReq;
1288 ret = true;
1289 break;
1290 }
1291
1292 // check if link request reply has a relay node ...
1293 if (!linkReq->getRelay().isUnspecified()) { // yes->
1294 ldn->up = true;
1295 ldn->relay = true;
1296 if (ldn->localRelay.isUnspecified()) {
1297 logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn );
1298 showLinkState();
1299 }
1300 ldn->remoteRelay = linkReq->getRelay();
1301 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1302 ldn->remoteNode = overlayMsg->getSourceNode();
1303
1304 ldn->markAlive();
1305
1306 // compile and send update message
1307 OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId );
1308 _overlayMsg.setAutoLink(ldn->autolink);
1309 sendMessage( &_overlayMsg, ldn );
1310
1311 // auto links: link has been accepted -> send queued messages
1312 if( ldn->messageQueue.size() > 0 ) {
1313 logging_info( "Sending out queued messages on link " <<
1314 ldn->overlayId.toString() );
1315 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1316 sendMessage( msg, ldn->overlayId );
1317 delete msg;
1318 }
1319 ldn->messageQueue.clear();
1320 }
1321
1322 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1323
1324 // try to establish a direct link
1325 ldn->communicationId =
1326 bc->establishLink( *linkReq->getEndpoint(), i->second );
1327 }
1328
1329 // no relay node-> use overlay routing
1330 else {
1331 ldn->up = true;
1332
1333 // establish direct link
1334 ldn->communicationId =
1335 bc->establishLink( *linkReq->getEndpoint(), i->second );
1336 }
1337 } else {
1338 logging_debug( "Link request received from node id="
1339 << overlayMsg->getSourceNode() );
1340
1341 // create link descriptor
1342 LinkDescriptor* ldn =
1343 createLinkDescriptor(overlayMsg->getSourceNode(),
1344 overlayMsg->getService(), LinkID::UNSPECIFIED );
1345 assert(!ldn->overlayId.isUnspecified());
1346
1347 // create reply message
1348 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
1349 LinkRequest link_request_msg(
1350 linkReq->getNonce(),
1351 &bc->getEndpointDescriptor(),
1352 true, ldn->overlayId, ldn->localRelay
1353 );
1354 overlay_msg.encapsulate( &link_request_msg );
1355
1356 // debug message
1357 logging_debug( "Sending LinkRequest reply for link with nonce " <<
1358 linkReq->getNonce() );
1359
1360 // if this is a relay link-> update information & inform listeners
1361 if (!linkReq->getRelay().isUnspecified()) {
1362 // set flags
1363 ldn->up = true;
1364 ldn->relay = true;
1365 if (ldn->localRelay.isUnspecified()) {
1366 logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn );
1367 showLinkState();
1368 }
1369 ldn->remoteRelay = linkReq->getRelay();
1370 ldn->remoteNode = overlayMsg->getSourceNode();
1371 ldn->remoteLinkId = linkReq->getRemoteLinkId();
1372 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1373 }
1374
1375 // route message back over overlay
1376 sendMessage( &overlay_msg, ldn );
1377 }
1378 delete linkReq;
1379 ret = true;
1380 break;
1381 }
1382
1383 // ---------------------------------------------------------------------
1384 // handle relay message to forward messages
1385 // ---------------------------------------------------------------------
1386 case OverlayMsg::typeRelay: {
1387
1388 logging_debug( "received relay request on link" );
1389
1390 // decapsulate message
1391 RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>();
1392
1393 // is relay message informative?
1394 switch (relayMsg->getType()) {
1395
1396 // handle relay notification
1397 case RelayMessage::typeInform: {
1398 logging_info("Received relay information message with"
1399 << " relay " << relayMsg->getRelayNode()
1400 << " destination " << relayMsg->getDestNode() );
1401
1402 // mark incoming link as relay
1403 if (ld!=NULL) ld->markAsRelay();
1404
1405 // am I the destination of this message? yes->
1406 if (relayMsg->getDestNode() == nodeId ) {
1407 // deliver relay message locally!
1408 logging_debug("Relay message reached destination. Handling the message.");
1409 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1410 ret = true;
1411 break;
1412 }
1413
1414 // create route message
1415 OverlayMsg _overMsg( *overlayMsg );
1416 RelayMessage _relayMsg( *relayMsg );
1417 _relayMsg.setType( RelayMessage::typeRoute );
1418 _overMsg.encapsulate( &_relayMsg );
1419
1420 // forward message
1421 if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) {
1422 logging_info("Routing relay message to " << relayMsg->getDestNode().toString() );
1423 sendOverlay( &_overMsg, relayMsg->getDestNode() );
1424 } else {
1425 logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() );
1426 sendOverlay( &_overMsg, relayMsg->getRelayNode() );
1427 }
1428 ret = true;
1429 break;
1430 }
1431
1432 // handle relay routing
1433 case RelayMessage::typeRoute: {
1434 logging_info("Received relay route message with"
1435 << " relay " << relayMsg->getRelayNode()
1436 << " destination " << relayMsg->getDestNode() );
1437
1438 // mark incoming link as relay
1439 if (ld!=NULL) ld->markAsRelay();
1440
1441 // am I the destination of this message? yes->
1442 if (relayMsg->getDestNode() == nodeId ) {
1443 // deliver relay message locally!
1444 logging_debug("Relay message reached destination. Handling the message.");
1445 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1446 ret = true;
1447 break;
1448 }
1449
1450 // am I the relay for this message? yes->
1451 if (relayMsg->getRelayNode() == nodeId ) {
1452 logging_debug("I'm the relay for this message. Sending to destination.");
1453 OverlayMsg _overMsg( *overlayMsg );
1454 RelayMessage _relayMsg( *relayMsg );
1455 _overMsg.encapsulate(&_relayMsg);
1456
1457 /// this must be handled by using relay link!
1458 sendOverlay(&_overMsg, relayMsg->getDestNode());
1459 ret = true;
1460 break;
1461 }
1462
1463 // error: I'm not a relay or destination!
1464 logging_error("This node is neither relay nor destination. Dropping Message!");
1465 ret = true;
1466 break;
1467 }
1468 default: {
1469 logging_error("RelayMessage Unknown!");
1470 ret = true;
1471 break;
1472 }
1473 }
1474 delete relayMsg;
1475 break;
1476 }
1477
1478 // ---------------------------------------------------------------------
1479 // handle keep-alive messages
1480 // ---------------------------------------------------------------------
1481 case OverlayMsg::typeKeepAlive: {
1482 logging_debug( "received keep-alive on link" );
1483 if ( ld != NULL ) {
1484 logging_info("Keep-Alive for "<< ld->overlayId);
1485 ld->markAlive();
1486 }
1487 break;
1488 }
1489
1490 // ---------------------------------------------------------------------
1491 // handle direct link replacement messages
1492 // ---------------------------------------------------------------------
1493 case OverlayMsg::typeDirectLink: {
1494
1495 logging_debug( "Received direct link replacement request" );
1496
1497 LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() );
1498 if (rld == NULL || ld == NULL) {
1499 logging_error("Direct link replacement: Link "
1500 << overlayMsg->getRelayLink() << "not found error." );
1501 break;
1502 }
1503 logging_info( "Received direct link convert notification for " << rld );
1504
1505 // set communcation link id and set it up
1506 rld->communicationId = ld->communicationId;
1507
1508 // this is neccessary since this link was a relay link before!
1509 rld->communicationUp = true;
1510
1511 // this is not a relay link anymore!
1512 rld->relay = false;
1513 rld->localRelay = NodeID::UNSPECIFIED;
1514 rld->remoteRelay = NodeID::UNSPECIFIED;
1515
1516 // mark used and alive!
1517 rld->markAsUsed();
1518 rld->markAlive();
1519
1520 // erase the original descriptor
1521 eraseDescriptor(ld->overlayId);
1522 break;
1523 }
1524
1525 // ---------------------------------------------------------------------
1526 // handle unknown message type
1527 // ---------------------------------------------------------------------
1528 default: {
1529 logging_error( "received message in invalid state! don't know " <<
1530 "what to do with this message of type " <<
1531 overlayMsg->getType() );
1532 ret = false;
1533 break;
1534 }
1535 } /* switch */
1536
1537 delete overlayMsg;
1538 return ret;
1539}
1540
1541// ----------------------------------------------------------------------------
1542
1543void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1544
1545 logging_debug( "broadcasting message to all known nodes " <<
1546 "in the overlay from service " + service.toString() );
1547
1548 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1549 OverlayInterface::NodeList::iterator i = nodes.begin();
1550 for(; i != nodes.end(); i++ ) {
1551 if( *i == nodeId) continue; // don't send to ourselfs
1552 sendMessage( message, *i, service );
1553 }
1554}
1555
1556vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1557 // the known nodes _can_ also include our node, so we remove ourself
1558 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1559
1560 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1561 if( i != nodes.end() ) nodes.erase( i );
1562
1563 return nodes;
1564}
1565
1566const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1567 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1568 const LinkDescriptor* ld = getDescriptor(lid);
1569 if( ld == NULL ) return NodeID::UNSPECIFIED;
1570 else return ld->remoteNode;
1571}
1572
1573vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1574 vector<LinkID> linkvector;
1575 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1576 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1577 linkvector.push_back( ld->overlayId );
1578 }
1579 }
1580 return linkvector;
1581}
1582
1583
1584void BaseOverlay::onNodeJoin(const NodeID& node) {
1585 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1586 if( i == joiningNodes.end() ) return;
1587
1588 logging_info( "node has successfully joined baseoverlay and overlay structure "
1589 << node.toString() );
1590
1591 joiningNodes.erase( i );
1592}
1593
1594void BaseOverlay::eventFunction() {
1595
1596 // send keep-alive messages over established links
1597 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1598 if (!ld->up) continue;
1599 OverlayMsg overMsg( OverlayMsg::typeKeepAlive,
1600 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1601 sendMessage( &overMsg, ld );
1602 }
1603
1604 // iterate over all links and check for time boundaries
1605 vector<LinkDescriptor*> oldlinks;
1606 time_t now = time(NULL);
1607 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1608 // remote used as relay flag
1609 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)
1610 ld->usedAsRelay = false;
1611
1612 // keep alives and not up? yes-> link connection request is stale!
1613 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
1614
1615 // increase counter
1616 ld->keepAliveMissed++;
1617
1618 // missed more than four keep-alive messages (10 sec)? -> drop link
1619 if (ld->keepAliveMissed > 4) {
1620 logging_info( "Link connection request is stale, closing: " << ld );
1621 oldlinks.push_back( ld );
1622 continue;
1623 }
1624 }
1625
1626 if (!ld->up) continue;
1627
1628 // drop links that are dropped and not used as relay
1629 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink) {
1630 oldlinks.push_back( ld );
1631 continue;
1632 }
1633
1634 // auto-link time exceeded?
1635 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
1636 oldlinks.push_back( ld );
1637 continue;
1638 }
1639
1640 // keep alives missed? yes->
1641 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
1642
1643 // increase counter
1644 ld->keepAliveMissed++;
1645
1646 // missed more than four keep-alive messages (4 sec)? -> drop link
1647 if (ld->keepAliveMissed >= 8) {
1648 logging_info( "Link is stale, closing: " << ld );
1649 oldlinks.push_back( ld );
1650 continue;
1651 }
1652 }
1653 }
1654
1655 // drop links
1656 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
1657/*
1658 vector<LinkID>::iterator it = std::find(
1659 bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId);
1660
1661 if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){
1662 logging_info( "Not dropping initiator link: " << ld );
1663 continue;
1664 }
1665*/
1666 logging_info( "Link timed out. Dropping " << ld );
1667 dropLink( ld->overlayId );
1668 }
1669
1670 // show link state
1671 counter++;
1672 if (counter>=4) showLinkState();
1673 if (counter>=4 || counter<0) counter = 0;
1674}
1675
1676void BaseOverlay::showLinkState() {
1677 int i=0;
1678 logging_info("--- link state -------------------------------");
1679 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1680 logging_info("link " << i << ": " << ld);
1681 i++;
1682 }
1683 logging_info("----------------------------------------------");
1684}
1685
1686}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.