source: source/ariba/overlay/BaseOverlay.cpp@ 5885

Last change on this file since 5885 was 5885, checked in by mies, 15 years ago
File size: 47.8 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/misc/OvlVis.h"
57
58namespace ariba {
59namespace overlay {
60
61/* *****************************************************************************
62 * PREREQUESITES
63 * ****************************************************************************/
64
65CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
66 if( !communicationListeners.contains( service ) ) {
67 logging_error( "No listener found for service " << service.toString() );
68 return NULL;
69 }
70 CommunicationListener* listener = communicationListeners.get( service );
71 assert( listener != NULL );
72 return listener;
73}
74
75// link descriptor handling ----------------------------------------------------
76
77LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
78 BOOST_FOREACH( LinkDescriptor* lp, links )
79 if ((communication ? lp->communicationId : lp->overlayId) == link)
80 return lp;
81 return NULL;
82}
83
84const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
85 BOOST_FOREACH( const LinkDescriptor* lp, links )
86 if ((communication ? lp->communicationId : lp->overlayId) == link)
87 return lp;
88 return NULL;
89}
90
91/// erases a link descriptor
92void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
93 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
94 LinkDescriptor* ld = *i;
95 if ((communication ? ld->communicationId : ld->overlayId) == link) {
96 delete ld;
97 links.erase(i);
98 break;
99 }
100 }
101}
102
103/// adds a link descriptor
104LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
105 LinkDescriptor* desc = getDescriptor( link );
106 if ( desc == NULL ) {
107 desc = new LinkDescriptor();
108 if (!link.isUnspecified()) desc->overlayId = link;
109 links.push_back(desc);
110 }
111 return desc;
112}
113
114/// returns a auto-link descriptor
115LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
116 // search for a descriptor that is already up
117 BOOST_FOREACH( LinkDescriptor* lp, links )
118 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
119 return lp;
120 // if not found, search for one that is about to come up...
121 BOOST_FOREACH( LinkDescriptor* lp, links )
122 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
123 return lp;
124 return NULL;
125}
126
127/// stabilizes link information
128void BaseOverlay::stabilizeLinks() {
129 // send keep-alive messages over established links
130 BOOST_FOREACH( LinkDescriptor* ld, links ) {
131 if (!ld->up) continue;
132 OverlayMsg msg( OverlayMsg::typeLinkAlive,
133 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
134 msg.setRelayed(true);
135 if (ld->relayed) msg.setRouteRecord(true);
136 send_link( &msg, ld->overlayId );
137 }
138
139 // iterate over all links and check for time boundaries
140 vector<LinkDescriptor*> oldlinks;
141 time_t now = time(NULL);
142 BOOST_FOREACH( LinkDescriptor* ld, links ) {
143
144 // keep alives and not up? yes-> link connection request is stale!
145 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
146
147 // increase counter
148 ld->keepAliveMissed++;
149
150 // missed more than four keep-alive messages (10 sec)? -> drop link
151 if (ld->keepAliveMissed > 4) {
152 logging_info( "Link connection request is stale, closing: " << ld );
153 oldlinks.push_back( ld );
154 continue;
155 }
156 }
157
158 if (!ld->up) continue;
159
160 // remote used as relay flag
161 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
162 ld->relaying = false;
163
164 // drop links that are dropped and not used as relay
165 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
166 oldlinks.push_back( ld );
167 continue;
168 }
169
170 // auto-link time exceeded?
171 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
172 oldlinks.push_back( ld );
173 continue;
174 }
175
176 // keep alives missed? yes->
177 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
178
179 // increase counter
180 ld->keepAliveMissed++;
181
182 // missed more than four keep-alive messages (4 sec)? -> drop link
183 if (ld->keepAliveMissed >= 4) {
184 logging_info( "Link is stale, closing: " << ld );
185 oldlinks.push_back( ld );
186 continue;
187 }
188 }
189 }
190
191 // drop links
192 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
193 logging_info( "Link timed out. Dropping " << ld );
194 ld->relaying = false;
195 dropLink( ld->overlayId );
196 }
197
198 // show link state
199 counter++;
200 if (counter>=4) showLinks();
201 if (counter>=4 || counter<0) counter = 0;
202}
203
204/// shows the current link state
205void BaseOverlay::showLinks() {
206 int i=0;
207 logging_info("--- link state -------------------------------");
208 BOOST_FOREACH( LinkDescriptor* ld, links ) {
209 logging_info("link " << i << ": " << ld);
210 i++;
211 }
212 logging_info("----------------------------------------------");
213}
214
215/// compares two arbitrary links to the same node
216int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
217 LinkDescriptor* lhsld = getDescriptor(lhs);
218 LinkDescriptor* rhsld = getDescriptor(rhs);
219 if (lhsld==NULL || rhsld==NULL
220 || !lhsld->up || !rhsld->up
221 || lhsld->remoteNode != rhsld->remoteNode) return -1;
222
223 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
224 return -1;
225
226 return 1;
227}
228
229
230// internal message delivery ---------------------------------------------------
231
232/// routes a message to its destination node
233void BaseOverlay::route( OverlayMsg* message ) {
234
235 // exceeded time-to-live? yes-> drop message
236 if (message->getNumHops() > message->getTimeToLive()) {
237 logging_warn("Message exceeded TTL -> drop message!");
238 return;
239
240 // no-> forward message
241 } else {
242 // destinastion myself? yes-> handle message
243 if (message->getDestinationNode() == nodeId) {
244 logging_warn("Usually I should not route messages to myself!");
245 Message msg;
246 msg.encapsulate(message);
247 handleMessage( &msg, NULL );
248 } else {
249 // no->send message to next hop
250 send( message, message->getDestinationNode() );
251 }
252 }
253}
254
255/// sends a message to another node, delivers it to the base overlay class
256seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
257 LinkDescriptor* next_link = NULL;
258
259 // drop messages to unspecified destinations
260 if (destination.isUnspecified()) return -1;
261
262 // send messages to myself -> handle message and drop warning!
263 if (destination == nodeId) {
264 logging_warn("Sent message to myself. Handling message.")
265 Message msg;
266 msg.encapsulate(message);
267 handleMessage( &msg, NULL );
268 return -1;
269 }
270
271 // relay path known? yes-> send over relay link
272 next_link = getRelayLinkTo( destination );
273 if (next_link != NULL) {
274 next_link->setRelaying();
275 return send(message, next_link);
276 }
277
278 // no-> relay path! route over overlay path
279 LinkID next_id = overlayInterface->getNextLinkId( destination );
280 if (next_id.isUnspecified()) {
281 logging_error("Could not send message. No next hop found to " << destination );
282 return -1;
283 }
284
285 // get link descriptor, up and running? yes-> send message
286 next_link = getDescriptor(next_id);
287 if (next_link != NULL && next_link->up)
288 return send(message, next_link);
289
290 // no-> error, dropping message
291 else {
292 logging_error("Could not send message. Link not known or up");
293 return -1;
294 }
295
296 // not reached-> fail
297 return -1;
298}
299
300/// send a message using a link descriptor, delivers it to the base overlay class
301seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ld, bool ignore_down ) {
302 assert(ld!=NULL);
303
304 // check if up
305 if (!ld->up && !ignore_down) {
306 logging_error("Can not send message. Link not up:" << ld );
307 return -1;
308 }
309
310 // handle relayed link
311 if (ld->relayed) {
312 logging_debug("send(): Resolving direct link for relayed link to "
313 << ld->remoteNode);
314 ld = getRelayLinkTo( ld->remoteNode );
315 if (ld==NULL) {
316 logging_error("Direct link not found.");
317 return -1;
318 }
319 message->setRelayed();
320 ld->setRelaying();
321 }
322
323 // handle direct link
324 if (ld->communicationUp) {
325 logging_debug("send(): Sending message over direct link.");
326 return bc->sendMessage( ld->communicationId, message );
327 } else {
328 logging_error("send(): Could not send mesage. "
329 "Not a relayed link and direct link is not up.");
330 return -1;
331 }
332 return -1;
333}
334
335seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
336 const ServiceID& service) {
337 message->setSourceNode(nodeId);
338 message->setDestinationNode(remote);
339 message->setService(service);
340 send( message, remote );
341}
342
343seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
344 LinkDescriptor* ld = getDescriptor(link);
345 if (ld==NULL) {
346 logging_error("Cannot find descriptor to link id=" << link.toString());
347 return -1;
348 }
349 message->setSourceNode(nodeId);
350 message->setDestinationNode(ld->remoteNode);
351
352 message->setSourceLink(ld->overlayId);
353 message->setDestinationLink(ld->remoteLink);
354
355 message->setService(ld->service);
356 return send( message, ld, ignore_down );
357}
358
359// relay route management ------------------------------------------------------
360
361/// stabilize relay information
362void BaseOverlay::stabilizeRelays() {
363 vector<relay_route>::iterator i = relay_routes.begin();
364 while (i!=relay_routes.end() ) {
365 relay_route& route = *i;
366 LinkDescriptor* ld = getDescriptor(route.link);
367 if (ld==NULL
368 || !ld->up
369 || ld->keepAliveMissed != 0
370 || !ld->communicationUp
371 || difftime(route.used, time(NULL)) > 4) {
372 logging_info("Forgetting relay information to node "
373 << route.node.toString() );
374 i = relay_routes.erase(i);
375 } else
376 i++;
377 }
378}
379
380void BaseOverlay::removeRelayLink( const LinkID& link ) {
381 vector<relay_route>::iterator i = relay_routes.begin();
382 while (i!=relay_routes.end() ) {
383 relay_route& route = *i;
384 if (route.link == link ) i = relay_routes.erase(i); else i++;
385 }
386}
387
388void BaseOverlay::removeRelayNode( const NodeID& remote ) {
389 vector<relay_route>::iterator i = relay_routes.begin();
390 while (i!=relay_routes.end() ) {
391 relay_route& route = *i;
392 if (route.node == remote ) i = relay_routes.erase(i); else i++;
393 }
394}
395
396/// refreshes relay information
397void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
398
399 // handle relayed messages from real links only
400 if (ld == NULL
401 || ld->relayed
402 || !message->isRelayed()
403// || !(message->getService() == OverlayInterface::OVERLAY_SERVICE_ID)
404 || message->getSourceNode()==nodeId ) return;
405
406 // check wheter this node is already part of the routing table
407 LinkID next_link = overlayInterface->getNextLinkId(message->getSourceNode());
408 if (next_link == ld->overlayId) return;
409 ld->setRelaying();
410
411 // try to find source node
412 BOOST_FOREACH( relay_route& route, relay_routes ) {
413
414 // relay route found? yes->
415 if ( route.node == message->getSourceNode() ) {
416
417 // refresh timer
418 route.used = time(NULL);
419
420 // route has a shorter hop count? yes-> replace
421 if (route.hops > message->getNumHops() ) {
422 logging_info("Updating relay information to node "
423 << route.node.toString()
424 << " reducing to " << message->getNumHops() << " hops.");
425 route.hops = message->getNumHops();
426 route.link = ld->overlayId;
427 }
428 return;
429 }
430 }
431
432 // not found-> add new entry
433 relay_route route;
434 route.hops = message->getNumHops();
435 route.link = ld->overlayId;
436 route.node = message->getSourceNode();
437 route.used = time(NULL);
438 logging_info("Remembering relay information to node " << route.node.toString());
439 relay_routes.push_back(route);
440}
441
442/// returns a known "vital" relay link which is up and running
443LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
444 // try to find source node
445 BOOST_FOREACH( relay_route& route, relay_routes ) {
446 if (route.node == remote ) {
447 LinkDescriptor* ld = getDescriptor( route.link );
448 if (ld==NULL || !ld->up) return NULL; else {
449 route.used = time(NULL);
450 return ld;
451 }
452 }
453 }
454 return NULL;
455}
456
457/* *****************************************************************************
458 * PUBLIC MEMBERS
459 * ****************************************************************************/
460
461use_logging_cpp(BaseOverlay);
462
463// ----------------------------------------------------------------------------
464
465BaseOverlay::BaseOverlay() :
466 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
467 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
468 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
469}
470
471BaseOverlay::~BaseOverlay() {
472}
473
474// ----------------------------------------------------------------------------
475
476void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
477 logging_info("Starting...");
478
479 // set parameters
480 bc = &_basecomm;
481 nodeId = _nodeid;
482
483 // register at base communication
484 bc->registerMessageReceiver( this );
485 bc->registerEventListener( this );
486
487 // timer for auto link management
488 Timer::setInterval( 1000 );
489 Timer::start();
490
491 started = true;
492 state = BaseOverlayStateInvalid;
493}
494
495void BaseOverlay::stop() {
496 logging_info("Stopping...");
497
498 // stop timer
499 Timer::stop();
500
501 // delete oberlay interface
502 if(overlayInterface != NULL) {
503 delete overlayInterface;
504 overlayInterface = NULL;
505 }
506
507 // unregister at base communication
508 bc->unregisterMessageReceiver( this );
509 bc->unregisterEventListener( this );
510
511 started = false;
512 state = BaseOverlayStateInvalid;
513}
514
515bool BaseOverlay::isStarted(){
516 return started;
517}
518
519// ----------------------------------------------------------------------------
520
521void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
522 const EndpointDescriptor& bootstrapEp) {
523
524 if(id != spovnetId){
525 logging_error("attempt to join against invalid spovnet, call initiate first");
526 return;
527 }
528
529
530 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
531 logging_info( "Starting to join spovnet " << id.toString() <<
532 " with nodeid " << nodeId.toString());
533
534 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
535
536 // bootstrap against ourselfs
537 logging_debug("joining spovnet locally");
538
539 overlayInterface->joinOverlay();
540 state = BaseOverlayStateCompleted;
541 BOOST_FOREACH( NodeListener* i, nodeListeners )
542 i->onJoinCompleted( spovnetId );
543
544 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
545 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
546
547 logging_debug("starting overlay bootstrap module");
548 overlayBootstrap.start(this, spovnetId, nodeId);
549 overlayBootstrap.publish(bc->getEndpointDescriptor());
550
551 } else {
552
553 // bootstrap against another node
554 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
555
556 const LinkID& lnk = bc->establishLink( bootstrapEp );
557 bootstrapLinks.push_back(lnk);
558 logging_info("join process initiated for " << id.toString() << "...");
559 }
560}
561
562void BaseOverlay::leaveSpoVNet() {
563
564 logging_info( "Leaving spovnet " << spovnetId );
565 bool ret = ( state != this->BaseOverlayStateInvalid );
566
567 logging_debug("stopping overlay bootstrap module");
568 overlayBootstrap.stop();
569 overlayBootstrap.revoke();
570
571 logging_debug( "Dropping all auto-links" );
572
573 // gather all service links
574 vector<LinkID> servicelinks;
575 BOOST_FOREACH( LinkDescriptor* ld, links ) {
576 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
577 servicelinks.push_back( ld->overlayId );
578 }
579
580 // drop all service links
581 BOOST_FOREACH( LinkID lnk, servicelinks )
582 dropLink( lnk );
583
584 // let the node leave the spovnet overlay interface
585 logging_debug( "Leaving overlay" );
586 if( overlayInterface != NULL )
587 overlayInterface->leaveOverlay();
588
589 // drop still open bootstrap links
590 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
591 bc->dropLink( lnk );
592
593 // change to inalid state
594 state = BaseOverlayStateInvalid;
595 //ovl.visShutdown( ovlId, nodeId, string("") );
596
597 // inform all registered services of the event
598 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
599 if( ret ) i->onLeaveCompleted( spovnetId );
600 else i->onLeaveFailed( spovnetId );
601 }
602}
603
604void BaseOverlay::createSpoVNet(const SpoVNetID& id,
605 const OverlayParameterSet& param,
606 const SecurityParameterSet& sec,
607 const QoSParameterSet& qos) {
608
609 // set the state that we are an initiator, this way incoming messages are
610 // handled correctly
611 logging_info( "creating spovnet " + id.toString() <<
612 " with nodeid " << nodeId.toString() );
613
614 spovnetId = id;
615
616 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
617 if( overlayInterface == NULL ) {
618 logging_fatal( "overlay structure not supported" );
619 state = BaseOverlayStateInvalid;
620
621 BOOST_FOREACH( NodeListener* i, nodeListeners )
622 i->onJoinFailed( spovnetId );
623
624 return;
625 }
626}
627
628// ----------------------------------------------------------------------------
629
630const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
631 const NodeID& remoteId, const ServiceID& service ) {
632
633 // establish link via overlay
634 if (!remoteId.isUnspecified())
635 return establishLink( remoteId, service );
636 else
637
638 // establish link directly if only ep is known
639 if (remoteId.isUnspecified())
640 return establishDirectLink(remoteEp, service );
641
642}
643
644/// call base communication's establish link and add link mapping
645const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
646 const ServiceID& service ) {
647
648 /// find a service listener
649 if( !communicationListeners.contains( service ) ) {
650 logging_error( "No listener registered for service id=" << service.toString() );
651 return LinkID::UNSPECIFIED;
652 }
653 CommunicationListener* listener = communicationListeners.get( service );
654 assert( listener != NULL );
655
656 // create descriptor
657 LinkDescriptor* ld = addDescriptor();
658 ld->relayed = false;
659 ld->listener = listener;
660 ld->service = service;
661 ld->communicationId = bc->establishLink( ep );
662
663 /// establish link and add mapping
664 logging_info("Establishing direct link " << ld->communicationId.toString()
665 << " using " << ep.toString());
666
667 return ld->communicationId;
668}
669
670/// establishes a link between two arbitrary nodes
671const LinkID BaseOverlay::establishLink( const NodeID& remote,
672 const ServiceID& service ) {
673
674 // do not establish a link to myself!
675 if (remote == nodeId) return LinkID::UNSPECIFIED;
676
677 // create a link descriptor
678 LinkDescriptor* ld = addDescriptor();
679 ld->relayed = true;
680 ld->remoteNode = remote;
681 ld->service = service;
682 ld->listener = getListener(ld->service);
683
684 // create link request message
685 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
686 msg.setSourceLink(ld->overlayId);
687 msg.setRelayed(true);
688
689 // debug message
690 logging_info(
691 "Sending link request with"
692 << " link=" << ld->overlayId.toString()
693 << " node=" << ld->remoteNode.toString()
694 << " serv=" << ld->service.toString()
695 );
696
697 // sending message to node
698 send_node( &msg, ld->remoteNode, ld->service );
699
700 return ld->overlayId;
701}
702
703/// drops an established link
704void BaseOverlay::dropLink(const LinkID& link) {
705 logging_info( "Dropping link (initiated locally):" << link.toString() );
706
707 // find the link item to drop
708 LinkDescriptor* ld = getDescriptor(link);
709 if( ld == NULL ) {
710 logging_warn( "Can't drop link, link is unknown!");
711 return;
712 }
713
714 // delete all queued messages
715 if( ld->messageQueue.size() > 0 ) {
716 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
717 << ld->messageQueue.size() << " waiting messages" );
718 ld->flushQueue();
719 }
720
721 // inform sideport and listener
722 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
723 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
724
725 // do not drop relay links
726 if (!ld->relaying) {
727 // drop the link in base communication
728 if (ld->communicationUp) bc->dropLink( ld->communicationId );
729
730 // erase descriptor
731 eraseDescriptor( ld->overlayId );
732 } else {
733 ld->dropAfterRelaying = true;
734 }
735}
736
737// ----------------------------------------------------------------------------
738
739/// internal send message, always use this functions to send messages over links
740seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
741 logging_debug( "Sending data message on link " << link.toString() );
742
743 // get the mapping for this link
744 LinkDescriptor* ld = getDescriptor(link);
745 if( ld == NULL ) {
746 logging_error("Could not send message. "
747 << "Link not found id=" << link.toString());
748 return -1;
749 }
750
751 // check if the link is up yet, if its an auto link queue message
752 if( !ld->up ) {
753 ld->setAutoUsed();
754 if( ld->autolink ) {
755 logging_info("Auto-link " << link.toString() << " not up, queue message");
756 Data data = data_serialize( message );
757 const_cast<Message*>(message)->dropPayload();
758 ld->messageQueue.push_back( new Message(data) );
759 } else {
760 logging_error("Link " << link.toString() << " not up, drop message");
761 }
762 return -1;
763 }
764
765 // compile overlay message (has service and node id)
766 OverlayMsg overmsg( OverlayMsg::typeData );
767 overmsg.encapsulate( const_cast<Message*>(message) );
768
769 // send message over relay/direct/overlay
770 return send_link( &overmsg, ld->overlayId );
771}
772
773seqnum_t BaseOverlay::sendMessage(const Message* message,
774 const NodeID& node, const ServiceID& service) {
775
776 // find link for node and service
777 LinkDescriptor* ld = getAutoDescriptor( node, service );
778
779 // if we found no link, create an auto link
780 if( ld == NULL ) {
781
782 // debug output
783 logging_info( "No link to send message to node "
784 << node.toString() << " found for service "
785 << service.toString() << ". Creating auto link ..."
786 );
787
788 // call base overlay to create a link
789 LinkID link = establishLink( node, service );
790 ld = getDescriptor( link );
791 if( ld == NULL ) {
792 logging_error( "Failed to establish auto-link.");
793 return -1;
794 }
795 ld->autolink = true;
796
797 logging_debug( "Auto-link establishment in progress to node "
798 << node.toString() << " with link id=" << link.toString() );
799 }
800 assert(ld != NULL);
801
802 // mark the link as used, as we now send a message through it
803 ld->setAutoUsed();
804
805 // send / queue message
806 return sendMessage( message, ld->overlayId );
807}
808
809// ----------------------------------------------------------------------------
810
811const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
812 const LinkID& link) const {
813
814 // return own end-point descriptor
815 if( link == LinkID::UNSPECIFIED )
816 return bc->getEndpointDescriptor();
817
818 // find link descriptor. not found -> return unspecified
819 const LinkDescriptor* ld = getDescriptor(link);
820 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
821
822 // return endpoint-descriptor from base communication
823 return bc->getEndpointDescriptor( ld->communicationId );
824}
825
826const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
827 const NodeID& node) const {
828
829 // return own end-point descriptor
830 if( node == nodeId || node == NodeID::UNSPECIFIED )
831 return bc->getEndpointDescriptor();
832
833 // no joined and request remote descriptor? -> fail!
834 if( overlayInterface == NULL ) {
835 logging_error( "overlay interface not set, cannot resolve endpoint" );
836 return EndpointDescriptor::UNSPECIFIED();
837 }
838
839 // resolve end-point descriptor from the base-overlay routing table
840 return overlayInterface->resolveNode( node );
841}
842
843// ----------------------------------------------------------------------------
844
845bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
846 sideport = _sideport;
847 _sideport->configure( this );
848}
849
850bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
851 sideport = &SideportListener::DEFAULT;
852}
853
854// ----------------------------------------------------------------------------
855
856bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
857 logging_debug( "binding communication listener " << listener
858 << " on serviceid " << sid.toString() );
859
860 if( communicationListeners.contains( sid ) ) {
861 logging_error( "some listener already registered for service id "
862 << sid.toString() );
863 return false;
864 }
865
866 communicationListeners.registerItem( listener, sid );
867 return true;
868}
869
870
871bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
872 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
873
874 if( !communicationListeners.contains( sid ) ) {
875 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
876 return false;
877 }
878
879 if( communicationListeners.get(sid) != listener ) {
880 logging_warn( "listener bound to service id " << sid.toString()
881 << " is different than listener trying to unbind" );
882 return false;
883 }
884
885 communicationListeners.unregisterItem( sid );
886 return true;
887}
888
889// ----------------------------------------------------------------------------
890
891bool BaseOverlay::bind(NodeListener* listener) {
892 logging_debug( "Binding node listener " << listener );
893
894 // already bound? yes-> warning
895 NodeListenerVector::iterator i =
896 find( nodeListeners.begin(), nodeListeners.end(), listener );
897 if( i != nodeListeners.end() ) {
898 logging_warn("Node listener " << listener << " is already bound!" );
899 return false;
900 }
901
902 // no-> add
903 nodeListeners.push_back( listener );
904 return true;
905}
906
907bool BaseOverlay::unbind(NodeListener* listener) {
908 logging_debug( "Unbinding node listener " << listener );
909
910 // already unbound? yes-> warning
911 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
912 if( i == nodeListeners.end() ) {
913 logging_warn( "Node listener " << listener << " is not bound!" );
914 return false;
915 }
916
917 // no-> remove
918 nodeListeners.erase( i );
919 return true;
920}
921
922// ----------------------------------------------------------------------------
923
924void BaseOverlay::onLinkUp(const LinkID& id,
925 const address_v* local, const address_v* remote) {
926 logging_debug( "Link up with base communication link id=" << id );
927
928 // get descriptor for link
929 LinkDescriptor* ld = getDescriptor(id, true);
930
931 // handle bootstrap link we initiated
932 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
933 logging_info(
934 "Join has been initiated by me and the link is now up. " <<
935 "Sending out join request for SpoVNet " << spovnetId.toString()
936 );
937
938 // send join request message
939 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
940 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
941 JoinRequest joinRequest( spovnetId, nodeId );
942 overlayMsg.encapsulate( &joinRequest );
943 bc->sendMessage( id, &overlayMsg );
944 return;
945 }
946
947 // no link found? -> link establishment from remote, add one!
948 if (ld == NULL) {
949 ld = addDescriptor( id );
950 logging_info( "onLinkUp (remote request) descriptor: " << ld );
951
952 // update descriptor
953 ld->fromRemote = true;
954 ld->communicationId = id;
955 ld->communicationUp = true;
956 ld->setAutoUsed();
957 ld->setAlive();
958
959 // in this case, do not inform listener, since service it unknown
960 // -> wait for update message!
961
962 // link mapping found? -> send update message with node-id and service id
963 } else {
964 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
965
966 // update descriptor
967 ld->setAutoUsed();
968 ld->setAlive();
969 ld->communicationUp = true;
970 ld->fromRemote = false;
971
972 // if link is a relayed link->convert to direct link
973 if (ld->relayed) {
974 logging_info( "Converting to direct link: " << ld );
975 ld->up = true;
976 ld->relayed = false;
977 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
978 overMsg.setSourceLink( ld->overlayId );
979 overMsg.setDestinationLink( ld->remoteLink );
980 send_link( &overMsg, ld->overlayId );
981 } else {
982 // note: necessary to validate the link on the remote side!
983 logging_info( "Sending out update" <<
984 " for service " << ld->service.toString() <<
985 " with local node id " << nodeId.toString() <<
986 " on link " << ld->overlayId.toString() );
987
988 // compile and send update message
989 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
990 overlayMsg.setSourceLink(ld->overlayId);
991 overlayMsg.setAutoLink( ld->autolink );
992 send_link( &overlayMsg, ld->overlayId, true );
993 }
994 }
995}
996
997void BaseOverlay::onLinkDown(const LinkID& id,
998 const address_v* local, const address_v* remote) {
999
1000 // erase bootstrap links
1001 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1002 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1003
1004 // get descriptor for link
1005 LinkDescriptor* ld = getDescriptor(id, true);
1006 if ( ld == NULL ) return; // not found? ->ignore!
1007 logging_info( "onLinkDown descriptor: " << ld );
1008
1009 // removing relay link information
1010 removeRelayLink(ld->overlayId);
1011
1012 // inform listeners about link down
1013 ld->communicationUp = false;
1014 if (!ld->service.isUnspecified()) {
1015 getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode );
1016 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1017 }
1018
1019 // delete all queued messages (auto links)
1020 if( ld->messageQueue.size() > 0 ) {
1021 logging_warn( "Dropping link " << id.toString() << " that has "
1022 << ld->messageQueue.size() << " waiting messages" );
1023 ld->flushQueue();
1024 }
1025
1026 // erase mapping
1027 eraseDescriptor(ld->overlayId);
1028}
1029
1030void BaseOverlay::onLinkChanged(const LinkID& id,
1031 const address_v* oldlocal, const address_v* newlocal,
1032 const address_v* oldremote, const address_v* newremote) {
1033
1034 // get descriptor for link
1035 LinkDescriptor* ld = getDescriptor(id, true);
1036 if ( ld == NULL ) return; // not found? ->ignore!
1037 logging_debug( "onLinkChanged descriptor: " << ld );
1038
1039 // inform listeners
1040 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1041 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1042
1043 // autolinks: refresh timestamp
1044 ld->setAutoUsed();
1045}
1046
1047void BaseOverlay::onLinkFail(const LinkID& id,
1048 const address_v* local, const address_v* remote) {
1049 logging_debug( "Link fail with base communication link id=" << id );
1050
1051 // erase bootstrap links
1052 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1053 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1054
1055 // get descriptor for link
1056 LinkDescriptor* ld = getDescriptor(id, true);
1057 if ( ld == NULL ) return; // not found? ->ignore!
1058 logging_debug( "Link failed id=" << ld->overlayId.toString() );
1059
1060 // inform listeners
1061 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1062 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1063}
1064
1065void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1066 const address_v* remote, const QoSParameterSet& qos) {
1067 logging_debug( "Link quality changed with base communication link id=" << id );
1068
1069 // get descriptor for link
1070 LinkDescriptor* ld = getDescriptor(id, true);
1071 if ( ld == NULL ) return; // not found? ->ignore!
1072 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1073}
1074
1075bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1076 const address_v* remote ) {
1077 logging_debug("Accepting link request from " << remote->to_string() );
1078 return true;
1079}
1080
1081/// handles a message from base communication
1082bool BaseOverlay::receiveMessage(const Message* message,
1083 const LinkID& link, const NodeID& ) {
1084 // get descriptor for link
1085 LinkDescriptor* ld = getDescriptor( link, true );
1086 return handleMessage( message, ld, link );
1087}
1088
1089// ----------------------------------------------------------------------------
1090
1091/// Handle spovnet instance join requests
1092bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1093
1094 // decapsulate message
1095 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1096 logging_info( "Received join request for spovnet " <<
1097 joinReq->getSpoVNetID().toString() );
1098
1099 // check spovnet id
1100 if( joinReq->getSpoVNetID() != spovnetId ) {
1101 logging_error(
1102 "Received join request for spovnet we don't handle " <<
1103 joinReq->getSpoVNetID().toString() );
1104 return false;
1105 }
1106
1107 // TODO: here you can implement mechanisms to deny joining of a node
1108 bool allow = true;
1109 logging_info( "Sending join reply for spovnet " <<
1110 spovnetId.toString() << " to node " <<
1111 overlayMsg->getSourceNode().toString() <<
1112 ". Result: " << (allow ? "allowed" : "denied") );
1113 joiningNodes.push_back( overlayMsg->getSourceNode() );
1114
1115 // return overlay parameters
1116 assert( overlayInterface != NULL );
1117 logging_debug( "Using bootstrap end-point "
1118 << getEndpointDescriptor().toString() )
1119 OverlayParameterSet parameters = overlayInterface->getParameters();
1120 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1121 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1122 JoinReply replyMsg( spovnetId, parameters,
1123 allow, getEndpointDescriptor() );
1124 retmsg.encapsulate(&replyMsg);
1125 bc->sendMessage( bcLink, &retmsg );
1126
1127 return true;
1128}
1129
1130/// Handle replies to spovnet instance join requests
1131bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1132 // decapsulate message
1133 logging_debug("received join reply message");
1134 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1135
1136 // correct spovnet?
1137 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1138 logging_error( "Received SpoVNet join reply for " <<
1139 replyMsg->getSpoVNetID().toString() <<
1140 " != " << spovnetId.toString() );
1141 delete replyMsg;
1142 return false;
1143 }
1144
1145 // access granted? no -> fail
1146 if( !replyMsg->getJoinAllowed() ) {
1147 logging_error( "Our join request has been denied" );
1148
1149 // drop initiator link
1150 if( !bcLink.isUnspecified() ){
1151 bc->dropLink( bcLink );
1152
1153 vector<LinkID>::iterator it = std::find(
1154 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1155 if( it != bootstrapLinks.end() )
1156 bootstrapLinks.erase(it);
1157 }
1158
1159 // inform all registered services of the event
1160 BOOST_FOREACH( NodeListener* i, nodeListeners )
1161 i->onJoinFailed( spovnetId );
1162
1163 delete replyMsg;
1164 return true;
1165 }
1166
1167 // access has been granted -> continue!
1168 logging_info("Join request has been accepted for spovnet " <<
1169 spovnetId.toString() );
1170
1171 logging_debug( "Using bootstrap end-point "
1172 << replyMsg->getBootstrapEndpoint().toString() );
1173
1174 // create overlay structure from spovnet parameter set
1175 // if we have not boostrapped yet against some other node
1176 if( overlayInterface == NULL ){
1177
1178 logging_debug("first-time bootstrapping");
1179
1180 overlayInterface = OverlayFactory::create(
1181 *this, replyMsg->getParam(), nodeId, this );
1182
1183 // overlay structure supported? no-> fail!
1184 if( overlayInterface == NULL ) {
1185 logging_error( "overlay structure not supported" );
1186
1187 if( !bcLink.isUnspecified() ){
1188 bc->dropLink( bcLink );
1189
1190 vector<LinkID>::iterator it = std::find(
1191 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1192 if( it != bootstrapLinks.end() )
1193 bootstrapLinks.erase(it);
1194 }
1195
1196 // inform all registered services of the event
1197 BOOST_FOREACH( NodeListener* i, nodeListeners )
1198 i->onJoinFailed( spovnetId );
1199
1200 delete replyMsg;
1201 return true;
1202 }
1203
1204 // everything ok-> join the overlay!
1205 state = BaseOverlayStateCompleted;
1206 overlayInterface->createOverlay();
1207
1208 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1209 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1210
1211 // update ovlvis
1212 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1213
1214 // inform all registered services of the event
1215 BOOST_FOREACH( NodeListener* i, nodeListeners )
1216 i->onJoinCompleted( spovnetId );
1217
1218 delete replyMsg;
1219
1220 } else {
1221
1222 // this is not the first bootstrap, just join the additional node
1223 logging_debug("not first-time bootstrapping");
1224 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1225 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1226
1227 delete replyMsg;
1228
1229 } // if( overlayInterface == NULL )
1230
1231 return true;
1232}
1233
1234
1235bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1236 // get service
1237 const ServiceID& service = overlayMsg->getService();
1238 logging_debug( "Received data for service " << service.toString()
1239 << " on link " << overlayMsg->getDestinationLink().toString() );
1240
1241 // delegate data message
1242 getListener(service)->onMessage(
1243 overlayMsg,
1244 overlayMsg->getSourceNode(),
1245 overlayMsg->getDestinationLink()
1246 );
1247
1248 return true;
1249}
1250
1251
1252bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1253
1254 if( ld == NULL ) {
1255 logging_warn( "received overlay update message for link for "
1256 << "which we have no mapping" );
1257 return false;
1258 }
1259 logging_info("Received type update message on link " << ld );
1260
1261 // update our link mapping information for this link
1262 bool changed =
1263 ( ld->remoteNode != overlayMsg->getSourceNode() )
1264 || ( ld->service != overlayMsg->getService() );
1265
1266 // set parameters
1267 ld->up = true;
1268 ld->remoteNode = overlayMsg->getSourceNode();
1269 ld->remoteLink = overlayMsg->getSourceLink();
1270 ld->service = overlayMsg->getService();
1271 ld->autolink = overlayMsg->isAutoLink();
1272
1273 // if our link information changed, we send out an update, too
1274 if( changed ) {
1275 overlayMsg->swapRoles();
1276 overlayMsg->setSourceNode(nodeId);
1277 overlayMsg->setSourceLink(ld->overlayId);
1278 overlayMsg->setService(ld->service);
1279 send( overlayMsg, ld );
1280 }
1281
1282 // service registered? no-> error!
1283 if( !communicationListeners.contains( ld->service ) ) {
1284 logging_warn( "Link up: event listener has not been registered" );
1285 return false;
1286 }
1287
1288 // default or no service registered?
1289 CommunicationListener* listener = communicationListeners.get( ld->service );
1290 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1291 logging_warn("Link up: event listener is default or null!" );
1292 return true;
1293 }
1294
1295 // update descriptor
1296 ld->listener = listener;
1297 ld->setAutoUsed();
1298 ld->setAlive();
1299
1300 // ask the service whether it wants to accept this link
1301 if( !listener->onLinkRequest(ld->remoteNode) ) {
1302
1303 logging_debug("Link id=" << ld->overlayId.toString() <<
1304 " has been denied by service " << ld->service.toString() << ", dropping link");
1305
1306 // prevent onLinkDown calls to the service
1307 ld->listener = &CommunicationListener::DEFAULT;
1308
1309 // drop the link
1310 dropLink( ld->overlayId );
1311 return true;
1312 }
1313
1314 // set link up
1315 ld->up = true;
1316 logging_info( "Link has been accepted by service and is up: " << ld );
1317
1318 // auto links: link has been accepted -> send queued messages
1319 if( ld->messageQueue.size() > 0 ) {
1320 logging_info( "Sending out queued messages on link " << ld );
1321 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1322 sendMessage( msg, ld->overlayId );
1323 delete msg;
1324 }
1325 ld->messageQueue.clear();
1326 }
1327
1328 // call the notification functions
1329 listener->onLinkUp( ld->overlayId, ld->remoteNode );
1330 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1331
1332 return true;
1333}
1334
1335/// handle a link request and reply
1336bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1337 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1338
1339 //TODO: Check if a request has already been sent using getSourceLink() ...
1340
1341 // create link descriptor
1342 LinkDescriptor* ldn = addDescriptor();
1343
1344 // flags
1345 ldn->up = true;
1346 ldn->fromRemote = true;
1347 ldn->relayed = true;
1348
1349 // parameters
1350 ldn->service = overlayMsg->getService();
1351 ldn->listener = getListener(ldn->service);
1352 ldn->remoteNode = overlayMsg->getSourceNode();
1353 ldn->remoteLink = overlayMsg->getSourceLink();
1354
1355 // update time-stamps
1356 ldn->setAlive();
1357 ldn->setAutoUsed();
1358
1359 // create reply message and send back!
1360 overlayMsg->swapRoles(); // swap source/destination
1361 overlayMsg->setType(OverlayMsg::typeLinkReply);
1362 overlayMsg->setSourceLink(ldn->overlayId);
1363 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1364 overlayMsg->setRelayed(true);
1365 send( overlayMsg, ld ); // send back to link
1366
1367 // inform listener
1368 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1369
1370 return true;
1371}
1372
1373bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1374
1375 // find link request
1376 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1377
1378 // not found? yes-> drop with error!
1379 if (ldn == NULL) {
1380 logging_error( "No link request pending for "
1381 << overlayMsg->getDestinationLink().toString() );
1382 return false;
1383 }
1384 logging_debug("Handling link reply for " << ldn )
1385
1386 // check if already up
1387 if (ldn->up) {
1388 logging_warn( "Link already up: " << ldn );
1389 return true;
1390 }
1391
1392 // debug message
1393 logging_debug( "Link request reply received. Establishing link"
1394 << " for service " << overlayMsg->getService().toString()
1395 << " with local id=" << overlayMsg->getDestinationLink()
1396 << " and remote link id=" << overlayMsg->getSourceLink()
1397 << " to " << overlayMsg->getSourceEndpoint().toString()
1398 );
1399
1400 // set local link descriptor data
1401 ldn->up = true;
1402 ldn->relayed = true;
1403 ldn->service = overlayMsg->getService();
1404 ldn->listener = getListener(ldn->service);
1405 ldn->remoteLink = overlayMsg->getSourceLink();
1406 ldn->remoteNode = overlayMsg->getSourceNode();
1407
1408 // update timestamps
1409 ldn->setAlive();
1410 ldn->setAutoUsed();
1411
1412 // auto links: link has been accepted -> send queued messages
1413 if( ldn->messageQueue.size() > 0 ) {
1414 logging_info( "Sending out queued messages on link " <<
1415 ldn->overlayId.toString() );
1416 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1417 sendMessage( msg, ldn->overlayId );
1418 delete msg;
1419 }
1420 ldn->messageQueue.clear();
1421 }
1422
1423 // inform listeners about new link
1424 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1425
1426 // try to replace relay link with direct link
1427 ldn->communicationId =
1428 bc->establishLink( overlayMsg->getSourceEndpoint() );
1429
1430 return true;
1431}
1432
1433/// handle a keep-alive message for a link
1434bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1435 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1436 if ( rld != NULL ) {
1437 logging_debug("Keep-Alive for " <<
1438 overlayMsg->getDestinationLink() );
1439 if (overlayMsg->isRouteRecord())
1440 rld->routeRecord = overlayMsg->getRouteRecord();
1441 rld->setAlive();
1442 return true;
1443 } else {
1444 logging_error("Keep-Alive for "
1445 << overlayMsg->getDestinationLink() << ": link unknown." );
1446 return false;
1447 }
1448}
1449
1450/// handle a direct link message
1451bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1452 logging_debug( "Received direct link replacement request" );
1453
1454 /// get destination overlay link
1455 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1456 if (rld == NULL || ld == NULL) {
1457 logging_error("Direct link replacement: Link "
1458 << overlayMsg->getDestinationLink() << "not found error." );
1459 return false;
1460 }
1461 logging_info( "Received direct link convert notification for " << rld );
1462
1463 // update information
1464 rld->communicationId = ld->communicationId;
1465 rld->communicationUp = true;
1466 rld->relayed = false;
1467
1468 // mark used and alive!
1469 rld->setAlive();
1470 rld->setAutoUsed();
1471
1472 // erase the original descriptor
1473 eraseDescriptor(ld->overlayId);
1474}
1475
1476/// handles an incoming message
1477bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1478 const LinkID bcLink ) {
1479 logging_debug( "Handling message: " << message->toString());
1480
1481 // decapsulate overlay message
1482 OverlayMsg* overlayMsg =
1483 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1484 if( overlayMsg == NULL ) return false;
1485
1486 // increase number of hops
1487 overlayMsg->increaseNumHops();
1488
1489 // refresh relay information
1490 refreshRelayInformation( overlayMsg, ld );
1491
1492 // update route record
1493 overlayMsg->addRouteRecord(nodeId);
1494
1495 // handle signaling messages (do not route!)
1496 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1497 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1498 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1499 delete overlayMsg;
1500 return true;
1501 }
1502
1503 // message for reached destination? no-> route message
1504 if (!overlayMsg->getDestinationNode().isUnspecified() &&
1505 overlayMsg->getDestinationNode() != nodeId ) {
1506 logging_debug("Routing message "
1507 << " from " << overlayMsg->getSourceNode()
1508 << " to " << overlayMsg->getDestinationNode()
1509 );
1510 route( overlayMsg );
1511 delete overlayMsg;
1512 return true;
1513 }
1514
1515 // handle base overlay message
1516 bool ret = false; // return value
1517 switch ( overlayMsg->getType() ) {
1518
1519 // data transport messages
1520 case OverlayMsg::typeData:
1521 ret = handleData(overlayMsg, ld); break;
1522
1523 // overlay setup messages
1524 case OverlayMsg::typeJoinRequest:
1525 ret = handleJoinRequest(overlayMsg, bcLink ); break;
1526 case OverlayMsg::typeJoinReply:
1527 ret = handleJoinReply(overlayMsg, bcLink ); break;
1528
1529 // link specific messages
1530 case OverlayMsg::typeLinkRequest:
1531 ret = handleLinkRequest(overlayMsg, ld ); break;
1532 case OverlayMsg::typeLinkReply:
1533 ret = handleLinkReply(overlayMsg, ld ); break;
1534 case OverlayMsg::typeLinkUpdate:
1535 ret = handleLinkUpdate(overlayMsg, ld ); break;
1536 case OverlayMsg::typeLinkAlive:
1537 ret = handleLinkAlive(overlayMsg, ld ); break;
1538 case OverlayMsg::typeLinkDirect:
1539 ret = handleLinkDirect(overlayMsg, ld ); break;
1540
1541 // handle unknown message type
1542 default: {
1543 logging_error( "received message in invalid state! don't know " <<
1544 "what to do with this message of type " << overlayMsg->getType() );
1545 ret = false;
1546 break;
1547 }
1548 }
1549
1550 // free overlay message and return value
1551 delete overlayMsg;
1552 return ret;
1553}
1554
1555// ----------------------------------------------------------------------------
1556
1557void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1558
1559 logging_debug( "broadcasting message to all known nodes " <<
1560 "in the overlay from service " + service.toString() );
1561
1562 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1563 OverlayInterface::NodeList::iterator i = nodes.begin();
1564 for(; i != nodes.end(); i++ ) {
1565 if( *i == nodeId) continue; // don't send to ourselfs
1566 sendMessage( message, *i, service );
1567 }
1568}
1569
1570/// return the overlay neighbors
1571vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1572 // the known nodes _can_ also include our node, so we remove ourself
1573 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1574 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1575 if( i != nodes.end() ) nodes.erase( i );
1576 return nodes;
1577}
1578
1579const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1580 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1581 const LinkDescriptor* ld = getDescriptor(lid);
1582 if( ld == NULL ) return NodeID::UNSPECIFIED;
1583 else return ld->remoteNode;
1584}
1585
1586vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1587 vector<LinkID> linkvector;
1588 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1589 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1590 linkvector.push_back( ld->overlayId );
1591 }
1592 }
1593 return linkvector;
1594}
1595
1596
1597void BaseOverlay::onNodeJoin(const NodeID& node) {
1598 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1599 if( i == joiningNodes.end() ) return;
1600
1601 logging_info( "node has successfully joined baseoverlay and overlay structure "
1602 << node.toString() );
1603
1604 joiningNodes.erase( i );
1605}
1606
1607void BaseOverlay::eventFunction() {
1608 stabilizeRelays();
1609 stabilizeLinks();
1610}
1611
1612}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.