source: source/ariba/overlay/BaseOverlay.cpp@ 5893

Last change on this file since 5893 was 5893, checked in by mies, 15 years ago
File size: 48.1 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/misc/OvlVis.h"
57
58namespace ariba {
59namespace overlay {
60
61/* *****************************************************************************
62 * PREREQUESITES
63 * ****************************************************************************/
64
65CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
66 if( !communicationListeners.contains( service ) ) {
67 logging_error( "No listener found for service " << service.toString() );
68 return NULL;
69 }
70 CommunicationListener* listener = communicationListeners.get( service );
71 assert( listener != NULL );
72 return listener;
73}
74
75// link descriptor handling ----------------------------------------------------
76
77LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
78 BOOST_FOREACH( LinkDescriptor* lp, links )
79 if ((communication ? lp->communicationId : lp->overlayId) == link)
80 return lp;
81 return NULL;
82}
83
84const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
85 BOOST_FOREACH( const LinkDescriptor* lp, links )
86 if ((communication ? lp->communicationId : lp->overlayId) == link)
87 return lp;
88 return NULL;
89}
90
91/// erases a link descriptor
92void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
93 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
94 LinkDescriptor* ld = *i;
95 if ((communication ? ld->communicationId : ld->overlayId) == link) {
96 delete ld;
97 links.erase(i);
98 break;
99 }
100 }
101}
102
103/// adds a link descriptor
104LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
105 LinkDescriptor* desc = getDescriptor( link );
106 if ( desc == NULL ) {
107 desc = new LinkDescriptor();
108 if (!link.isUnspecified()) desc->overlayId = link;
109 links.push_back(desc);
110 }
111 return desc;
112}
113
114/// returns a auto-link descriptor
115LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
116 // search for a descriptor that is already up
117 BOOST_FOREACH( LinkDescriptor* lp, links )
118 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
119 return lp;
120 // if not found, search for one that is about to come up...
121 BOOST_FOREACH( LinkDescriptor* lp, links )
122 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
123 return lp;
124 return NULL;
125}
126
127/// stabilizes link information
128void BaseOverlay::stabilizeLinks() {
129 // send keep-alive messages over established links
130 BOOST_FOREACH( LinkDescriptor* ld, links ) {
131 if (!ld->up) continue;
132 OverlayMsg msg( OverlayMsg::typeLinkAlive,
133 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
134 msg.setRelayed(true);
135 if (ld->relayed) msg.setRouteRecord(true);
136 send_link( &msg, ld->overlayId );
137 }
138
139 // iterate over all links and check for time boundaries
140 vector<LinkDescriptor*> oldlinks;
141 time_t now = time(NULL);
142 BOOST_FOREACH( LinkDescriptor* ld, links ) {
143
144 // keep alives and not up? yes-> link connection request is stale!
145 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
146
147 // increase counter
148 ld->keepAliveMissed++;
149
150 // missed more than four keep-alive messages (10 sec)? -> drop link
151 if (ld->keepAliveMissed > 4) {
152 logging_info( "Link connection request is stale, closing: " << ld );
153 oldlinks.push_back( ld );
154 continue;
155 }
156 }
157
158 if (!ld->up) continue;
159
160 // remote used as relay flag
161 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
162 ld->relaying = false;
163
164 // drop links that are dropped and not used as relay
165 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
166 oldlinks.push_back( ld );
167 continue;
168 }
169
170 // auto-link time exceeded?
171 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
172 oldlinks.push_back( ld );
173 continue;
174 }
175
176 // keep alives missed? yes->
177 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
178
179 // increase counter
180 ld->keepAliveMissed++;
181
182 // missed more than four keep-alive messages (4 sec)? -> drop link
183 if (ld->keepAliveMissed >= 4) {
184 logging_info( "Link is stale, closing: " << ld );
185 oldlinks.push_back( ld );
186 continue;
187 }
188 }
189 }
190
191 // drop links
192 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
193 logging_info( "Link timed out. Dropping " << ld );
194 ld->relaying = false;
195 dropLink( ld->overlayId );
196 }
197
198 // show link state
199 counter++;
200 if (counter>=4) showLinks();
201 if (counter>=4 || counter<0) counter = 0;
202}
203
204/// shows the current link state
205void BaseOverlay::showLinks() {
206 int i=0;
207 logging_info("--- link state -------------------------------");
208 BOOST_FOREACH( LinkDescriptor* ld, links ) {
209 logging_info("link " << i << ": " << ld);
210 i++;
211 }
212 logging_info("----------------------------------------------");
213}
214
215/// compares two arbitrary links to the same node
216int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
217 LinkDescriptor* lhsld = getDescriptor(lhs);
218 LinkDescriptor* rhsld = getDescriptor(rhs);
219 if (lhsld==NULL || rhsld==NULL
220 || !lhsld->up || !rhsld->up
221 || lhsld->remoteNode != rhsld->remoteNode) return -1;
222
223 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
224 return -1;
225
226 return 1;
227}
228
229
230// internal message delivery ---------------------------------------------------
231
232/// routes a message to its destination node
233void BaseOverlay::route( OverlayMsg* message ) {
234
235 // exceeded time-to-live? yes-> drop message
236 if (message->getNumHops() > message->getTimeToLive()) {
237 logging_warn("Message exceeded TTL -> drop message!");
238 return;
239
240 // no-> forward message
241 } else {
242 // destinastion myself? yes-> handle message
243 if (message->getDestinationNode() == nodeId) {
244 logging_warn("Usually I should not route messages to myself!");
245 Message msg;
246 msg.encapsulate(message);
247 handleMessage( &msg, NULL );
248 } else {
249 // no->send message to next hop
250 send( message, message->getDestinationNode() );
251 }
252 }
253}
254
255/// sends a message to another node, delivers it to the base overlay class
256seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
257 LinkDescriptor* next_link = NULL;
258
259 // drop messages to unspecified destinations
260 if (destination.isUnspecified()) return -1;
261
262 // send messages to myself -> handle message and drop warning!
263 if (destination == nodeId) {
264 logging_warn("Sent message to myself. Handling message.")
265 Message msg;
266 msg.encapsulate(message);
267 handleMessage( &msg, NULL );
268 return -1;
269 }
270
271 // relay path known? yes-> send over relay link
272 next_link = getRelayLinkTo( destination );
273 if (next_link != NULL) {
274 next_link->setRelaying();
275 return send(message, next_link);
276 }
277
278 // no-> relay path! route over overlay path
279 LinkID next_id = overlayInterface->getNextLinkId( destination );
280 if (next_id.isUnspecified()) {
281 logging_error("Could not send message. No next hop found to " << destination );
282 return -1;
283 }
284
285 // get link descriptor, up and running? yes-> send message
286 next_link = getDescriptor(next_id);
287 if (next_link != NULL && next_link->up)
288 return send(message, next_link);
289
290 // no-> error, dropping message
291 else {
292 logging_error("Could not send message. Link not known or up");
293 return -1;
294 }
295
296 // not reached-> fail
297 return -1;
298}
299
300/// send a message using a link descriptor, delivers it to the base overlay class
301seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ld, bool ignore_down ) {
302 // check if null
303 if (ld == NULL) {
304 logging_error("Can not send message to " << message->getDestinationAddress());
305 return -1;
306 }
307
308 // check if up
309 if (!ld->up && !ignore_down) {
310 logging_error("Can not send message. Link not up:" << ld );
311 return -1;
312 }
313
314 // handle relayed link
315 if (ld->relayed) {
316 logging_debug("send(): Resolving direct link for relayed link to "
317 << ld->remoteNode);
318 ld = getRelayLinkTo( ld->remoteNode );
319 if (ld==NULL) {
320 LinkID lnk = overlayInterface->getNextLinkId(ld->remoteNode);
321 if (!lnk.isUnspecified())
322 ld = getDescriptor(lnk);
323 if (ld!=NULL && ld->relayed)
324 ld = NULL;
325 }
326 if (ld==NULL) {
327 logging_error("Direct link not found.");
328 return -1;
329 }
330 message->setRelayed();
331 ld->setRelaying();
332 }
333
334 // handle direct link
335 if (ld->communicationUp) {
336 logging_debug("send(): Sending message over direct link.");
337 return bc->sendMessage( ld->communicationId, message );
338 } else {
339 logging_error("send(): Could not send mesage. "
340 "Not a relayed link and direct link is not up.");
341 return -1;
342 }
343 return -1;
344}
345
346seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
347 const ServiceID& service) {
348 message->setSourceNode(nodeId);
349 message->setDestinationNode(remote);
350 message->setService(service);
351 send( message, remote );
352}
353
354seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
355 LinkDescriptor* ld = getDescriptor(link);
356 if (ld==NULL) {
357 logging_error("Cannot find descriptor to link id=" << link.toString());
358 return -1;
359 }
360 message->setSourceNode(nodeId);
361 message->setDestinationNode(ld->remoteNode);
362
363 message->setSourceLink(ld->overlayId);
364 message->setDestinationLink(ld->remoteLink);
365
366 message->setService(ld->service);
367 return send( message, ld, ignore_down );
368}
369
370// relay route management ------------------------------------------------------
371
372/// stabilize relay information
373void BaseOverlay::stabilizeRelays() {
374 vector<relay_route>::iterator i = relay_routes.begin();
375 while (i!=relay_routes.end() ) {
376 relay_route& route = *i;
377 LinkDescriptor* ld = getDescriptor(route.link);
378 if (ld==NULL
379 || !ld->up
380 || ld->keepAliveMissed != 0
381 || !ld->communicationUp
382 || difftime(route.used, time(NULL)) > 4) {
383 logging_info("Forgetting relay information to node "
384 << route.node.toString() );
385 i = relay_routes.erase(i);
386 } else
387 i++;
388 }
389}
390
391void BaseOverlay::removeRelayLink( const LinkID& link ) {
392 vector<relay_route>::iterator i = relay_routes.begin();
393 while (i!=relay_routes.end() ) {
394 relay_route& route = *i;
395 if (route.link == link ) i = relay_routes.erase(i); else i++;
396 }
397}
398
399void BaseOverlay::removeRelayNode( const NodeID& remote ) {
400 vector<relay_route>::iterator i = relay_routes.begin();
401 while (i!=relay_routes.end() ) {
402 relay_route& route = *i;
403 if (route.node == remote ) i = relay_routes.erase(i); else i++;
404 }
405}
406
407/// refreshes relay information
408void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
409
410 // handle relayed messages from real links only
411 if (ld == NULL
412 || ld->relayed
413 || !message->isRelayed()
414// || !(message->getService() == OverlayInterface::OVERLAY_SERVICE_ID)
415 || message->getSourceNode()==nodeId ) return;
416
417 // check wheter this node is already part of the routing table
418 LinkID next_link = overlayInterface->getNextLinkId(message->getSourceNode());
419 if (next_link == ld->overlayId) return;
420 ld->setRelaying();
421
422 // try to find source node
423 BOOST_FOREACH( relay_route& route, relay_routes ) {
424
425 // relay route found? yes->
426 if ( route.node == message->getSourceNode() ) {
427
428 // refresh timer
429 route.used = time(NULL);
430
431 // route has a shorter hop count? yes-> replace
432 if (route.hops > message->getNumHops() ) {
433 logging_info("Updating relay information to node "
434 << route.node.toString()
435 << " reducing to " << message->getNumHops() << " hops.");
436 route.hops = message->getNumHops();
437 route.link = ld->overlayId;
438 }
439 return;
440 }
441 }
442
443 // not found-> add new entry
444 relay_route route;
445 route.hops = message->getNumHops();
446 route.link = ld->overlayId;
447 route.node = message->getSourceNode();
448 route.used = time(NULL);
449 logging_info("Remembering relay information to node " << route.node.toString());
450 relay_routes.push_back(route);
451}
452
453/// returns a known "vital" relay link which is up and running
454LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
455 // try to find source node
456 BOOST_FOREACH( relay_route& route, relay_routes ) {
457 if (route.node == remote ) {
458 LinkDescriptor* ld = getDescriptor( route.link );
459 if (ld==NULL || !ld->up) return NULL; else {
460 route.used = time(NULL);
461 return ld;
462 }
463 }
464 }
465 return NULL;
466}
467
468/* *****************************************************************************
469 * PUBLIC MEMBERS
470 * ****************************************************************************/
471
472use_logging_cpp(BaseOverlay);
473
474// ----------------------------------------------------------------------------
475
476BaseOverlay::BaseOverlay() :
477 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
478 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
479 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
480}
481
482BaseOverlay::~BaseOverlay() {
483}
484
485// ----------------------------------------------------------------------------
486
487void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
488 logging_info("Starting...");
489
490 // set parameters
491 bc = &_basecomm;
492 nodeId = _nodeid;
493
494 // register at base communication
495 bc->registerMessageReceiver( this );
496 bc->registerEventListener( this );
497
498 // timer for auto link management
499 Timer::setInterval( 1000 );
500 Timer::start();
501
502 started = true;
503 state = BaseOverlayStateInvalid;
504}
505
506void BaseOverlay::stop() {
507 logging_info("Stopping...");
508
509 // stop timer
510 Timer::stop();
511
512 // delete oberlay interface
513 if(overlayInterface != NULL) {
514 delete overlayInterface;
515 overlayInterface = NULL;
516 }
517
518 // unregister at base communication
519 bc->unregisterMessageReceiver( this );
520 bc->unregisterEventListener( this );
521
522 started = false;
523 state = BaseOverlayStateInvalid;
524}
525
526bool BaseOverlay::isStarted(){
527 return started;
528}
529
530// ----------------------------------------------------------------------------
531
532void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
533 const EndpointDescriptor& bootstrapEp) {
534
535 if(id != spovnetId){
536 logging_error("attempt to join against invalid spovnet, call initiate first");
537 return;
538 }
539
540
541 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
542 logging_info( "Starting to join spovnet " << id.toString() <<
543 " with nodeid " << nodeId.toString());
544
545 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
546
547 // bootstrap against ourselfs
548 logging_debug("joining spovnet locally");
549
550 overlayInterface->joinOverlay();
551 state = BaseOverlayStateCompleted;
552 BOOST_FOREACH( NodeListener* i, nodeListeners )
553 i->onJoinCompleted( spovnetId );
554
555 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
556 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
557
558 logging_debug("starting overlay bootstrap module");
559 overlayBootstrap.start(this, spovnetId, nodeId);
560 overlayBootstrap.publish(bc->getEndpointDescriptor());
561
562 } else {
563
564 // bootstrap against another node
565 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
566
567 const LinkID& lnk = bc->establishLink( bootstrapEp );
568 bootstrapLinks.push_back(lnk);
569 logging_info("join process initiated for " << id.toString() << "...");
570 }
571}
572
573void BaseOverlay::leaveSpoVNet() {
574
575 logging_info( "Leaving spovnet " << spovnetId );
576 bool ret = ( state != this->BaseOverlayStateInvalid );
577
578 logging_debug("stopping overlay bootstrap module");
579 overlayBootstrap.stop();
580 overlayBootstrap.revoke();
581
582 logging_debug( "Dropping all auto-links" );
583
584 // gather all service links
585 vector<LinkID> servicelinks;
586 BOOST_FOREACH( LinkDescriptor* ld, links ) {
587 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
588 servicelinks.push_back( ld->overlayId );
589 }
590
591 // drop all service links
592 BOOST_FOREACH( LinkID lnk, servicelinks )
593 dropLink( lnk );
594
595 // let the node leave the spovnet overlay interface
596 logging_debug( "Leaving overlay" );
597 if( overlayInterface != NULL )
598 overlayInterface->leaveOverlay();
599
600 // drop still open bootstrap links
601 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
602 bc->dropLink( lnk );
603
604 // change to inalid state
605 state = BaseOverlayStateInvalid;
606 //ovl.visShutdown( ovlId, nodeId, string("") );
607
608 // inform all registered services of the event
609 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
610 if( ret ) i->onLeaveCompleted( spovnetId );
611 else i->onLeaveFailed( spovnetId );
612 }
613}
614
615void BaseOverlay::createSpoVNet(const SpoVNetID& id,
616 const OverlayParameterSet& param,
617 const SecurityParameterSet& sec,
618 const QoSParameterSet& qos) {
619
620 // set the state that we are an initiator, this way incoming messages are
621 // handled correctly
622 logging_info( "creating spovnet " + id.toString() <<
623 " with nodeid " << nodeId.toString() );
624
625 spovnetId = id;
626
627 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
628 if( overlayInterface == NULL ) {
629 logging_fatal( "overlay structure not supported" );
630 state = BaseOverlayStateInvalid;
631
632 BOOST_FOREACH( NodeListener* i, nodeListeners )
633 i->onJoinFailed( spovnetId );
634
635 return;
636 }
637}
638
639// ----------------------------------------------------------------------------
640
641const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
642 const NodeID& remoteId, const ServiceID& service ) {
643
644 // establish link via overlay
645 if (!remoteId.isUnspecified())
646 return establishLink( remoteId, service );
647 else
648
649 // establish link directly if only ep is known
650 if (remoteId.isUnspecified())
651 return establishDirectLink(remoteEp, service );
652
653}
654
655/// call base communication's establish link and add link mapping
656const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
657 const ServiceID& service ) {
658
659 /// find a service listener
660 if( !communicationListeners.contains( service ) ) {
661 logging_error( "No listener registered for service id=" << service.toString() );
662 return LinkID::UNSPECIFIED;
663 }
664 CommunicationListener* listener = communicationListeners.get( service );
665 assert( listener != NULL );
666
667 // create descriptor
668 LinkDescriptor* ld = addDescriptor();
669 ld->relayed = false;
670 ld->listener = listener;
671 ld->service = service;
672 ld->communicationId = bc->establishLink( ep );
673
674 /// establish link and add mapping
675 logging_info("Establishing direct link " << ld->communicationId.toString()
676 << " using " << ep.toString());
677
678 return ld->communicationId;
679}
680
681/// establishes a link between two arbitrary nodes
682const LinkID BaseOverlay::establishLink( const NodeID& remote,
683 const ServiceID& service ) {
684
685 // do not establish a link to myself!
686 if (remote == nodeId) return LinkID::UNSPECIFIED;
687
688 // create a link descriptor
689 LinkDescriptor* ld = addDescriptor();
690 ld->relayed = true;
691 ld->remoteNode = remote;
692 ld->service = service;
693 ld->listener = getListener(ld->service);
694
695 // create link request message
696 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
697 msg.setSourceLink(ld->overlayId);
698 msg.setRelayed(true);
699
700 // debug message
701 logging_info(
702 "Sending link request with"
703 << " link=" << ld->overlayId.toString()
704 << " node=" << ld->remoteNode.toString()
705 << " serv=" << ld->service.toString()
706 );
707
708 // sending message to node
709 send_node( &msg, ld->remoteNode, ld->service );
710
711 return ld->overlayId;
712}
713
714/// drops an established link
715void BaseOverlay::dropLink(const LinkID& link) {
716 logging_info( "Dropping link (initiated locally):" << link.toString() );
717
718 // find the link item to drop
719 LinkDescriptor* ld = getDescriptor(link);
720 if( ld == NULL ) {
721 logging_warn( "Can't drop link, link is unknown!");
722 return;
723 }
724
725 // delete all queued messages
726 if( ld->messageQueue.size() > 0 ) {
727 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
728 << ld->messageQueue.size() << " waiting messages" );
729 ld->flushQueue();
730 }
731
732 // inform sideport and listener
733 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
734 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
735
736 // do not drop relay links
737 if (!ld->relaying) {
738 // drop the link in base communication
739 if (ld->communicationUp) bc->dropLink( ld->communicationId );
740
741 // erase descriptor
742 eraseDescriptor( ld->overlayId );
743 } else {
744 ld->dropAfterRelaying = true;
745 }
746}
747
748// ----------------------------------------------------------------------------
749
750/// internal send message, always use this functions to send messages over links
751seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
752 logging_debug( "Sending data message on link " << link.toString() );
753
754 // get the mapping for this link
755 LinkDescriptor* ld = getDescriptor(link);
756 if( ld == NULL ) {
757 logging_error("Could not send message. "
758 << "Link not found id=" << link.toString());
759 return -1;
760 }
761
762 // check if the link is up yet, if its an auto link queue message
763 if( !ld->up ) {
764 ld->setAutoUsed();
765 if( ld->autolink ) {
766 logging_info("Auto-link " << link.toString() << " not up, queue message");
767 Data data = data_serialize( message );
768 const_cast<Message*>(message)->dropPayload();
769 ld->messageQueue.push_back( new Message(data) );
770 } else {
771 logging_error("Link " << link.toString() << " not up, drop message");
772 }
773 return -1;
774 }
775
776 // compile overlay message (has service and node id)
777 OverlayMsg overmsg( OverlayMsg::typeData );
778 overmsg.encapsulate( const_cast<Message*>(message) );
779
780 // send message over relay/direct/overlay
781 return send_link( &overmsg, ld->overlayId );
782}
783
784seqnum_t BaseOverlay::sendMessage(const Message* message,
785 const NodeID& node, const ServiceID& service) {
786
787 // find link for node and service
788 LinkDescriptor* ld = getAutoDescriptor( node, service );
789
790 // if we found no link, create an auto link
791 if( ld == NULL ) {
792
793 // debug output
794 logging_info( "No link to send message to node "
795 << node.toString() << " found for service "
796 << service.toString() << ". Creating auto link ..."
797 );
798
799 // call base overlay to create a link
800 LinkID link = establishLink( node, service );
801 ld = getDescriptor( link );
802 if( ld == NULL ) {
803 logging_error( "Failed to establish auto-link.");
804 return -1;
805 }
806 ld->autolink = true;
807
808 logging_debug( "Auto-link establishment in progress to node "
809 << node.toString() << " with link id=" << link.toString() );
810 }
811 assert(ld != NULL);
812
813 // mark the link as used, as we now send a message through it
814 ld->setAutoUsed();
815
816 // send / queue message
817 return sendMessage( message, ld->overlayId );
818}
819
820// ----------------------------------------------------------------------------
821
822const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
823 const LinkID& link) const {
824
825 // return own end-point descriptor
826 if( link == LinkID::UNSPECIFIED )
827 return bc->getEndpointDescriptor();
828
829 // find link descriptor. not found -> return unspecified
830 const LinkDescriptor* ld = getDescriptor(link);
831 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
832
833 // return endpoint-descriptor from base communication
834 return bc->getEndpointDescriptor( ld->communicationId );
835}
836
837const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
838 const NodeID& node) const {
839
840 // return own end-point descriptor
841 if( node == nodeId || node == NodeID::UNSPECIFIED )
842 return bc->getEndpointDescriptor();
843
844 // no joined and request remote descriptor? -> fail!
845 if( overlayInterface == NULL ) {
846 logging_error( "overlay interface not set, cannot resolve endpoint" );
847 return EndpointDescriptor::UNSPECIFIED();
848 }
849
850 // resolve end-point descriptor from the base-overlay routing table
851 return overlayInterface->resolveNode( node );
852}
853
854// ----------------------------------------------------------------------------
855
856bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
857 sideport = _sideport;
858 _sideport->configure( this );
859}
860
861bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
862 sideport = &SideportListener::DEFAULT;
863}
864
865// ----------------------------------------------------------------------------
866
867bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
868 logging_debug( "binding communication listener " << listener
869 << " on serviceid " << sid.toString() );
870
871 if( communicationListeners.contains( sid ) ) {
872 logging_error( "some listener already registered for service id "
873 << sid.toString() );
874 return false;
875 }
876
877 communicationListeners.registerItem( listener, sid );
878 return true;
879}
880
881
882bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
883 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
884
885 if( !communicationListeners.contains( sid ) ) {
886 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
887 return false;
888 }
889
890 if( communicationListeners.get(sid) != listener ) {
891 logging_warn( "listener bound to service id " << sid.toString()
892 << " is different than listener trying to unbind" );
893 return false;
894 }
895
896 communicationListeners.unregisterItem( sid );
897 return true;
898}
899
900// ----------------------------------------------------------------------------
901
902bool BaseOverlay::bind(NodeListener* listener) {
903 logging_debug( "Binding node listener " << listener );
904
905 // already bound? yes-> warning
906 NodeListenerVector::iterator i =
907 find( nodeListeners.begin(), nodeListeners.end(), listener );
908 if( i != nodeListeners.end() ) {
909 logging_warn("Node listener " << listener << " is already bound!" );
910 return false;
911 }
912
913 // no-> add
914 nodeListeners.push_back( listener );
915 return true;
916}
917
918bool BaseOverlay::unbind(NodeListener* listener) {
919 logging_debug( "Unbinding node listener " << listener );
920
921 // already unbound? yes-> warning
922 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
923 if( i == nodeListeners.end() ) {
924 logging_warn( "Node listener " << listener << " is not bound!" );
925 return false;
926 }
927
928 // no-> remove
929 nodeListeners.erase( i );
930 return true;
931}
932
933// ----------------------------------------------------------------------------
934
935void BaseOverlay::onLinkUp(const LinkID& id,
936 const address_v* local, const address_v* remote) {
937 logging_debug( "Link up with base communication link id=" << id );
938
939 // get descriptor for link
940 LinkDescriptor* ld = getDescriptor(id, true);
941
942 // handle bootstrap link we initiated
943 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
944 logging_info(
945 "Join has been initiated by me and the link is now up. " <<
946 "Sending out join request for SpoVNet " << spovnetId.toString()
947 );
948
949 // send join request message
950 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
951 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
952 JoinRequest joinRequest( spovnetId, nodeId );
953 overlayMsg.encapsulate( &joinRequest );
954 bc->sendMessage( id, &overlayMsg );
955 return;
956 }
957
958 // no link found? -> link establishment from remote, add one!
959 if (ld == NULL) {
960 ld = addDescriptor( id );
961 logging_info( "onLinkUp (remote request) descriptor: " << ld );
962
963 // update descriptor
964 ld->fromRemote = true;
965 ld->communicationId = id;
966 ld->communicationUp = true;
967 ld->setAutoUsed();
968 ld->setAlive();
969
970 // in this case, do not inform listener, since service it unknown
971 // -> wait for update message!
972
973 // link mapping found? -> send update message with node-id and service id
974 } else {
975 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
976
977 // update descriptor
978 ld->setAutoUsed();
979 ld->setAlive();
980 ld->communicationUp = true;
981 ld->fromRemote = false;
982
983 // if link is a relayed link->convert to direct link
984 if (ld->relayed) {
985 logging_info( "Converting to direct link: " << ld );
986 ld->up = true;
987 ld->relayed = false;
988 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
989 overMsg.setSourceLink( ld->overlayId );
990 overMsg.setDestinationLink( ld->remoteLink );
991 send_link( &overMsg, ld->overlayId );
992 } else {
993 // note: necessary to validate the link on the remote side!
994 logging_info( "Sending out update" <<
995 " for service " << ld->service.toString() <<
996 " with local node id " << nodeId.toString() <<
997 " on link " << ld->overlayId.toString() );
998
999 // compile and send update message
1000 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1001 overlayMsg.setSourceLink(ld->overlayId);
1002 overlayMsg.setAutoLink( ld->autolink );
1003 send_link( &overlayMsg, ld->overlayId, true );
1004 }
1005 }
1006}
1007
1008void BaseOverlay::onLinkDown(const LinkID& id,
1009 const address_v* local, const address_v* remote) {
1010
1011 // erase bootstrap links
1012 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1013 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1014
1015 // get descriptor for link
1016 LinkDescriptor* ld = getDescriptor(id, true);
1017 if ( ld == NULL ) return; // not found? ->ignore!
1018 logging_info( "onLinkDown descriptor: " << ld );
1019
1020 // removing relay link information
1021 removeRelayLink(ld->overlayId);
1022
1023 // inform listeners about link down
1024 ld->communicationUp = false;
1025 if (!ld->service.isUnspecified()) {
1026 getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode );
1027 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1028 }
1029
1030 // delete all queued messages (auto links)
1031 if( ld->messageQueue.size() > 0 ) {
1032 logging_warn( "Dropping link " << id.toString() << " that has "
1033 << ld->messageQueue.size() << " waiting messages" );
1034 ld->flushQueue();
1035 }
1036
1037 // erase mapping
1038 eraseDescriptor(ld->overlayId);
1039}
1040
1041void BaseOverlay::onLinkChanged(const LinkID& id,
1042 const address_v* oldlocal, const address_v* newlocal,
1043 const address_v* oldremote, const address_v* newremote) {
1044
1045 // get descriptor for link
1046 LinkDescriptor* ld = getDescriptor(id, true);
1047 if ( ld == NULL ) return; // not found? ->ignore!
1048 logging_debug( "onLinkChanged descriptor: " << ld );
1049
1050 // inform listeners
1051 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1052 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1053
1054 // autolinks: refresh timestamp
1055 ld->setAutoUsed();
1056}
1057
1058void BaseOverlay::onLinkFail(const LinkID& id,
1059 const address_v* local, const address_v* remote) {
1060 logging_debug( "Link fail with base communication link id=" << id );
1061
1062 // erase bootstrap links
1063 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1064 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1065
1066 // get descriptor for link
1067 LinkDescriptor* ld = getDescriptor(id, true);
1068 if ( ld == NULL ) return; // not found? ->ignore!
1069 logging_debug( "Link failed id=" << ld->overlayId.toString() );
1070
1071 // inform listeners
1072 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1073 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1074}
1075
1076void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1077 const address_v* remote, const QoSParameterSet& qos) {
1078 logging_debug( "Link quality changed with base communication link id=" << id );
1079
1080 // get descriptor for link
1081 LinkDescriptor* ld = getDescriptor(id, true);
1082 if ( ld == NULL ) return; // not found? ->ignore!
1083 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1084}
1085
1086bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1087 const address_v* remote ) {
1088 logging_debug("Accepting link request from " << remote->to_string() );
1089 return true;
1090}
1091
1092/// handles a message from base communication
1093bool BaseOverlay::receiveMessage(const Message* message,
1094 const LinkID& link, const NodeID& ) {
1095 // get descriptor for link
1096 LinkDescriptor* ld = getDescriptor( link, true );
1097 return handleMessage( message, ld, link );
1098}
1099
1100// ----------------------------------------------------------------------------
1101
1102/// Handle spovnet instance join requests
1103bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1104
1105 // decapsulate message
1106 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1107 logging_info( "Received join request for spovnet " <<
1108 joinReq->getSpoVNetID().toString() );
1109
1110 // check spovnet id
1111 if( joinReq->getSpoVNetID() != spovnetId ) {
1112 logging_error(
1113 "Received join request for spovnet we don't handle " <<
1114 joinReq->getSpoVNetID().toString() );
1115 return false;
1116 }
1117
1118 // TODO: here you can implement mechanisms to deny joining of a node
1119 bool allow = true;
1120 logging_info( "Sending join reply for spovnet " <<
1121 spovnetId.toString() << " to node " <<
1122 overlayMsg->getSourceNode().toString() <<
1123 ". Result: " << (allow ? "allowed" : "denied") );
1124 joiningNodes.push_back( overlayMsg->getSourceNode() );
1125
1126 // return overlay parameters
1127 assert( overlayInterface != NULL );
1128 logging_debug( "Using bootstrap end-point "
1129 << getEndpointDescriptor().toString() )
1130 OverlayParameterSet parameters = overlayInterface->getParameters();
1131 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1132 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1133 JoinReply replyMsg( spovnetId, parameters,
1134 allow, getEndpointDescriptor() );
1135 retmsg.encapsulate(&replyMsg);
1136 bc->sendMessage( bcLink, &retmsg );
1137
1138 return true;
1139}
1140
1141/// Handle replies to spovnet instance join requests
1142bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1143 // decapsulate message
1144 logging_debug("received join reply message");
1145 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1146
1147 // correct spovnet?
1148 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1149 logging_error( "Received SpoVNet join reply for " <<
1150 replyMsg->getSpoVNetID().toString() <<
1151 " != " << spovnetId.toString() );
1152 delete replyMsg;
1153 return false;
1154 }
1155
1156 // access granted? no -> fail
1157 if( !replyMsg->getJoinAllowed() ) {
1158 logging_error( "Our join request has been denied" );
1159
1160 // drop initiator link
1161 if( !bcLink.isUnspecified() ){
1162 bc->dropLink( bcLink );
1163
1164 vector<LinkID>::iterator it = std::find(
1165 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1166 if( it != bootstrapLinks.end() )
1167 bootstrapLinks.erase(it);
1168 }
1169
1170 // inform all registered services of the event
1171 BOOST_FOREACH( NodeListener* i, nodeListeners )
1172 i->onJoinFailed( spovnetId );
1173
1174 delete replyMsg;
1175 return true;
1176 }
1177
1178 // access has been granted -> continue!
1179 logging_info("Join request has been accepted for spovnet " <<
1180 spovnetId.toString() );
1181
1182 logging_debug( "Using bootstrap end-point "
1183 << replyMsg->getBootstrapEndpoint().toString() );
1184
1185 // create overlay structure from spovnet parameter set
1186 // if we have not boostrapped yet against some other node
1187 if( overlayInterface == NULL ){
1188
1189 logging_debug("first-time bootstrapping");
1190
1191 overlayInterface = OverlayFactory::create(
1192 *this, replyMsg->getParam(), nodeId, this );
1193
1194 // overlay structure supported? no-> fail!
1195 if( overlayInterface == NULL ) {
1196 logging_error( "overlay structure not supported" );
1197
1198 if( !bcLink.isUnspecified() ){
1199 bc->dropLink( bcLink );
1200
1201 vector<LinkID>::iterator it = std::find(
1202 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1203 if( it != bootstrapLinks.end() )
1204 bootstrapLinks.erase(it);
1205 }
1206
1207 // inform all registered services of the event
1208 BOOST_FOREACH( NodeListener* i, nodeListeners )
1209 i->onJoinFailed( spovnetId );
1210
1211 delete replyMsg;
1212 return true;
1213 }
1214
1215 // everything ok-> join the overlay!
1216 state = BaseOverlayStateCompleted;
1217 overlayInterface->createOverlay();
1218
1219 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1220 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1221
1222 // update ovlvis
1223 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1224
1225 // inform all registered services of the event
1226 BOOST_FOREACH( NodeListener* i, nodeListeners )
1227 i->onJoinCompleted( spovnetId );
1228
1229 delete replyMsg;
1230
1231 } else {
1232
1233 // this is not the first bootstrap, just join the additional node
1234 logging_debug("not first-time bootstrapping");
1235 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1236 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1237
1238 delete replyMsg;
1239
1240 } // if( overlayInterface == NULL )
1241
1242 return true;
1243}
1244
1245
1246bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1247 // get service
1248 const ServiceID& service = overlayMsg->getService();
1249 logging_debug( "Received data for service " << service.toString()
1250 << " on link " << overlayMsg->getDestinationLink().toString() );
1251
1252 // delegate data message
1253 getListener(service)->onMessage(
1254 overlayMsg,
1255 overlayMsg->getSourceNode(),
1256 overlayMsg->getDestinationLink()
1257 );
1258
1259 return true;
1260}
1261
1262
1263bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1264
1265 if( ld == NULL ) {
1266 logging_warn( "received overlay update message for link for "
1267 << "which we have no mapping" );
1268 return false;
1269 }
1270 logging_info("Received type update message on link " << ld );
1271
1272 // update our link mapping information for this link
1273 bool changed =
1274 ( ld->remoteNode != overlayMsg->getSourceNode() )
1275 || ( ld->service != overlayMsg->getService() );
1276
1277 // set parameters
1278 ld->up = true;
1279 ld->remoteNode = overlayMsg->getSourceNode();
1280 ld->remoteLink = overlayMsg->getSourceLink();
1281 ld->service = overlayMsg->getService();
1282 ld->autolink = overlayMsg->isAutoLink();
1283
1284 // if our link information changed, we send out an update, too
1285 if( changed ) {
1286 overlayMsg->swapRoles();
1287 overlayMsg->setSourceNode(nodeId);
1288 overlayMsg->setSourceLink(ld->overlayId);
1289 overlayMsg->setService(ld->service);
1290 send( overlayMsg, ld );
1291 }
1292
1293 // service registered? no-> error!
1294 if( !communicationListeners.contains( ld->service ) ) {
1295 logging_warn( "Link up: event listener has not been registered" );
1296 return false;
1297 }
1298
1299 // default or no service registered?
1300 CommunicationListener* listener = communicationListeners.get( ld->service );
1301 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1302 logging_warn("Link up: event listener is default or null!" );
1303 return true;
1304 }
1305
1306 // update descriptor
1307 ld->listener = listener;
1308 ld->setAutoUsed();
1309 ld->setAlive();
1310
1311 // ask the service whether it wants to accept this link
1312 if( !listener->onLinkRequest(ld->remoteNode) ) {
1313
1314 logging_debug("Link id=" << ld->overlayId.toString() <<
1315 " has been denied by service " << ld->service.toString() << ", dropping link");
1316
1317 // prevent onLinkDown calls to the service
1318 ld->listener = &CommunicationListener::DEFAULT;
1319
1320 // drop the link
1321 dropLink( ld->overlayId );
1322 return true;
1323 }
1324
1325 // set link up
1326 ld->up = true;
1327 logging_info( "Link has been accepted by service and is up: " << ld );
1328
1329 // auto links: link has been accepted -> send queued messages
1330 if( ld->messageQueue.size() > 0 ) {
1331 logging_info( "Sending out queued messages on link " << ld );
1332 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1333 sendMessage( msg, ld->overlayId );
1334 delete msg;
1335 }
1336 ld->messageQueue.clear();
1337 }
1338
1339 // call the notification functions
1340 listener->onLinkUp( ld->overlayId, ld->remoteNode );
1341 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1342
1343 return true;
1344}
1345
1346/// handle a link request and reply
1347bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1348 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1349
1350 //TODO: Check if a request has already been sent using getSourceLink() ...
1351
1352 // create link descriptor
1353 LinkDescriptor* ldn = addDescriptor();
1354
1355 // flags
1356 ldn->up = true;
1357 ldn->fromRemote = true;
1358 ldn->relayed = true;
1359
1360 // parameters
1361 ldn->service = overlayMsg->getService();
1362 ldn->listener = getListener(ldn->service);
1363 ldn->remoteNode = overlayMsg->getSourceNode();
1364 ldn->remoteLink = overlayMsg->getSourceLink();
1365
1366 // update time-stamps
1367 ldn->setAlive();
1368 ldn->setAutoUsed();
1369
1370 // create reply message and send back!
1371 overlayMsg->swapRoles(); // swap source/destination
1372 overlayMsg->setType(OverlayMsg::typeLinkReply);
1373 overlayMsg->setSourceLink(ldn->overlayId);
1374 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1375 overlayMsg->setRelayed(true);
1376 send( overlayMsg, ld ); // send back to link
1377
1378 // inform listener
1379 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1380
1381 return true;
1382}
1383
1384bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1385
1386 // find link request
1387 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1388
1389 // not found? yes-> drop with error!
1390 if (ldn == NULL) {
1391 logging_error( "No link request pending for "
1392 << overlayMsg->getDestinationLink().toString() );
1393 return false;
1394 }
1395 logging_debug("Handling link reply for " << ldn )
1396
1397 // check if already up
1398 if (ldn->up) {
1399 logging_warn( "Link already up: " << ldn );
1400 return true;
1401 }
1402
1403 // debug message
1404 logging_debug( "Link request reply received. Establishing link"
1405 << " for service " << overlayMsg->getService().toString()
1406 << " with local id=" << overlayMsg->getDestinationLink()
1407 << " and remote link id=" << overlayMsg->getSourceLink()
1408 << " to " << overlayMsg->getSourceEndpoint().toString()
1409 );
1410
1411 // set local link descriptor data
1412 ldn->up = true;
1413 ldn->relayed = true;
1414 ldn->service = overlayMsg->getService();
1415 ldn->listener = getListener(ldn->service);
1416 ldn->remoteLink = overlayMsg->getSourceLink();
1417 ldn->remoteNode = overlayMsg->getSourceNode();
1418
1419 // update timestamps
1420 ldn->setAlive();
1421 ldn->setAutoUsed();
1422
1423 // auto links: link has been accepted -> send queued messages
1424 if( ldn->messageQueue.size() > 0 ) {
1425 logging_info( "Sending out queued messages on link " <<
1426 ldn->overlayId.toString() );
1427 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1428 sendMessage( msg, ldn->overlayId );
1429 delete msg;
1430 }
1431 ldn->messageQueue.clear();
1432 }
1433
1434 // inform listeners about new link
1435 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1436
1437 // try to replace relay link with direct link
1438 ldn->communicationId =
1439 bc->establishLink( overlayMsg->getSourceEndpoint() );
1440
1441 return true;
1442}
1443
1444/// handle a keep-alive message for a link
1445bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1446 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1447 if ( rld != NULL ) {
1448 logging_debug("Keep-Alive for " <<
1449 overlayMsg->getDestinationLink() );
1450 if (overlayMsg->isRouteRecord())
1451 rld->routeRecord = overlayMsg->getRouteRecord();
1452 rld->setAlive();
1453 return true;
1454 } else {
1455 logging_error("Keep-Alive for "
1456 << overlayMsg->getDestinationLink() << ": link unknown." );
1457 return false;
1458 }
1459}
1460
1461/// handle a direct link message
1462bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1463 logging_debug( "Received direct link replacement request" );
1464
1465 /// get destination overlay link
1466 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1467 if (rld == NULL || ld == NULL) {
1468 logging_error("Direct link replacement: Link "
1469 << overlayMsg->getDestinationLink() << "not found error." );
1470 return false;
1471 }
1472 logging_info( "Received direct link convert notification for " << rld );
1473
1474 // update information
1475 rld->communicationId = ld->communicationId;
1476 rld->communicationUp = true;
1477 rld->relayed = false;
1478
1479 // mark used and alive!
1480 rld->setAlive();
1481 rld->setAutoUsed();
1482
1483 // erase the original descriptor
1484 eraseDescriptor(ld->overlayId);
1485}
1486
1487/// handles an incoming message
1488bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1489 const LinkID bcLink ) {
1490 logging_debug( "Handling message: " << message->toString());
1491
1492 // decapsulate overlay message
1493 OverlayMsg* overlayMsg =
1494 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1495 if( overlayMsg == NULL ) return false;
1496
1497 // increase number of hops
1498 overlayMsg->increaseNumHops();
1499
1500 // refresh relay information
1501 refreshRelayInformation( overlayMsg, ld );
1502
1503 // update route record
1504 overlayMsg->addRouteRecord(nodeId);
1505
1506 // handle signaling messages (do not route!)
1507 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1508 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1509 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1510 delete overlayMsg;
1511 return true;
1512 }
1513
1514 // message for reached destination? no-> route message
1515 if (!overlayMsg->getDestinationNode().isUnspecified() &&
1516 overlayMsg->getDestinationNode() != nodeId ) {
1517 logging_debug("Routing message "
1518 << " from " << overlayMsg->getSourceNode()
1519 << " to " << overlayMsg->getDestinationNode()
1520 );
1521 route( overlayMsg );
1522 delete overlayMsg;
1523 return true;
1524 }
1525
1526 // handle base overlay message
1527 bool ret = false; // return value
1528 switch ( overlayMsg->getType() ) {
1529
1530 // data transport messages
1531 case OverlayMsg::typeData:
1532 ret = handleData(overlayMsg, ld); break;
1533
1534 // overlay setup messages
1535 case OverlayMsg::typeJoinRequest:
1536 ret = handleJoinRequest(overlayMsg, bcLink ); break;
1537 case OverlayMsg::typeJoinReply:
1538 ret = handleJoinReply(overlayMsg, bcLink ); break;
1539
1540 // link specific messages
1541 case OverlayMsg::typeLinkRequest:
1542 ret = handleLinkRequest(overlayMsg, ld ); break;
1543 case OverlayMsg::typeLinkReply:
1544 ret = handleLinkReply(overlayMsg, ld ); break;
1545 case OverlayMsg::typeLinkUpdate:
1546 ret = handleLinkUpdate(overlayMsg, ld ); break;
1547 case OverlayMsg::typeLinkAlive:
1548 ret = handleLinkAlive(overlayMsg, ld ); break;
1549 case OverlayMsg::typeLinkDirect:
1550 ret = handleLinkDirect(overlayMsg, ld ); break;
1551
1552 // handle unknown message type
1553 default: {
1554 logging_error( "received message in invalid state! don't know " <<
1555 "what to do with this message of type " << overlayMsg->getType() );
1556 ret = false;
1557 break;
1558 }
1559 }
1560
1561 // free overlay message and return value
1562 delete overlayMsg;
1563 return ret;
1564}
1565
1566// ----------------------------------------------------------------------------
1567
1568void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1569
1570 logging_debug( "broadcasting message to all known nodes " <<
1571 "in the overlay from service " + service.toString() );
1572
1573 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1574 OverlayInterface::NodeList::iterator i = nodes.begin();
1575 for(; i != nodes.end(); i++ ) {
1576 if( *i == nodeId) continue; // don't send to ourselfs
1577 sendMessage( message, *i, service );
1578 }
1579}
1580
1581/// return the overlay neighbors
1582vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1583 // the known nodes _can_ also include our node, so we remove ourself
1584 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1585 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1586 if( i != nodes.end() ) nodes.erase( i );
1587 return nodes;
1588}
1589
1590const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1591 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1592 const LinkDescriptor* ld = getDescriptor(lid);
1593 if( ld == NULL ) return NodeID::UNSPECIFIED;
1594 else return ld->remoteNode;
1595}
1596
1597vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1598 vector<LinkID> linkvector;
1599 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1600 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1601 linkvector.push_back( ld->overlayId );
1602 }
1603 }
1604 return linkvector;
1605}
1606
1607
1608void BaseOverlay::onNodeJoin(const NodeID& node) {
1609 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1610 if( i == joiningNodes.end() ) return;
1611
1612 logging_info( "node has successfully joined baseoverlay and overlay structure "
1613 << node.toString() );
1614
1615 joiningNodes.erase( i );
1616}
1617
1618void BaseOverlay::eventFunction() {
1619 stabilizeRelays();
1620 stabilizeLinks();
1621}
1622
1623}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.