source: source/ariba/overlay/BaseOverlay.cpp@ 5882

Last change on this file since 5882 was 5882, checked in by mies, 15 years ago
File size: 47.6 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/misc/OvlVis.h"
57
58namespace ariba {
59namespace overlay {
60
61/* *****************************************************************************
62 * PREREQUESITES
63 * ****************************************************************************/
64
65CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
66 if( !communicationListeners.contains( service ) ) {
67 logging_error( "No listener found for service " << service.toString() );
68 return NULL;
69 }
70 CommunicationListener* listener = communicationListeners.get( service );
71 assert( listener != NULL );
72 return listener;
73}
74
75// link descriptor handling ----------------------------------------------------
76
77LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
78 BOOST_FOREACH( LinkDescriptor* lp, links )
79 if ((communication ? lp->communicationId : lp->overlayId) == link)
80 return lp;
81 return NULL;
82}
83
84const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
85 BOOST_FOREACH( const LinkDescriptor* lp, links )
86 if ((communication ? lp->communicationId : lp->overlayId) == link)
87 return lp;
88 return NULL;
89}
90
91/// erases a link descriptor
92void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
93 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
94 LinkDescriptor* ld = *i;
95 if ((communication ? ld->communicationId : ld->overlayId) == link) {
96 delete ld;
97 links.erase(i);
98 break;
99 }
100 }
101}
102
103/// adds a link descriptor
104LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
105 LinkDescriptor* desc = getDescriptor( link );
106 if ( desc == NULL ) {
107 desc = new LinkDescriptor();
108 if (!link.isUnspecified()) desc->overlayId = link;
109 links.push_back(desc);
110 }
111 return desc;
112}
113
114/// returns a auto-link descriptor
115LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
116 // search for a descriptor that is already up
117 BOOST_FOREACH( LinkDescriptor* lp, links )
118 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
119 return lp;
120 // if not found, search for one that is about to come up...
121 BOOST_FOREACH( LinkDescriptor* lp, links )
122 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
123 return lp;
124 return NULL;
125}
126
127/// stabilizes link information
128void BaseOverlay::stabilizeLinks() {
129 // send keep-alive messages over established links
130 BOOST_FOREACH( LinkDescriptor* ld, links ) {
131 if (!ld->up) continue;
132 OverlayMsg msg( OverlayMsg::typeLinkAlive,
133 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
134 msg.setRelayed(true);
135 if (ld->relayed) msg.setRouteRecord(true);
136 send_link( &msg, ld->overlayId );
137 }
138
139 // iterate over all links and check for time boundaries
140 vector<LinkDescriptor*> oldlinks;
141 time_t now = time(NULL);
142 BOOST_FOREACH( LinkDescriptor* ld, links ) {
143 // remote used as relay flag
144 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
145 ld->relaying = false;
146
147 // keep alives and not up? yes-> link connection request is stale!
148 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
149
150 // increase counter
151 ld->keepAliveMissed++;
152
153 // missed more than four keep-alive messages (10 sec)? -> drop link
154 if (ld->keepAliveMissed > 4) {
155 logging_info( "Link connection request is stale, closing: " << ld );
156 oldlinks.push_back( ld );
157 continue;
158 }
159 }
160
161 if (!ld->up) continue;
162
163 // drop links that are dropped and not used as relay
164 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
165 oldlinks.push_back( ld );
166 continue;
167 }
168
169 // auto-link time exceeded?
170 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
171 oldlinks.push_back( ld );
172 continue;
173 }
174
175 // keep alives missed? yes->
176 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
177
178 // increase counter
179 ld->keepAliveMissed++;
180
181 // missed more than four keep-alive messages (4 sec)? -> drop link
182 if (ld->keepAliveMissed >= 4) {
183 logging_info( "Link is stale, closing: " << ld );
184 oldlinks.push_back( ld );
185 continue;
186 }
187 }
188 }
189
190 // drop links
191 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
192 logging_info( "Link timed out. Dropping " << ld );
193 ld->relaying = false;
194 dropLink( ld->overlayId );
195 }
196
197 // show link state
198 counter++;
199 if (counter>=4) showLinks();
200 if (counter>=4 || counter<0) counter = 0;
201}
202
203/// shows the current link state
204void BaseOverlay::showLinks() {
205 int i=0;
206 logging_info("--- link state -------------------------------");
207 BOOST_FOREACH( LinkDescriptor* ld, links ) {
208 logging_info("link " << i << ": " << ld);
209 i++;
210 }
211 logging_info("----------------------------------------------");
212}
213
214/// compares two arbitrary links to the same node
215int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
216 LinkDescriptor* lhsld = getDescriptor(lhs);
217 LinkDescriptor* rhsld = getDescriptor(rhs);
218 if (lhsld==NULL || rhsld==NULL
219 || !lhsld->up || !rhsld->up
220 || lhsld->remoteNode != rhsld->remoteNode) return -1;
221
222 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
223 return -1;
224
225 return 1;
226}
227
228
229// internal message delivery ---------------------------------------------------
230
231/// routes a message to its destination node
232void BaseOverlay::route( OverlayMsg* message ) {
233
234 // exceeded time-to-live? yes-> drop message
235 if (message->getNumHops() > message->getTimeToLive()) {
236 logging_warn("Message exceeded TTL -> drop message!");
237 return;
238
239 // no-> forward message
240 } else {
241 // destinastion myself? yes-> handle message
242 if (message->getDestinationNode() == nodeId)
243 handleMessage( message, NULL );
244 else
245 // no->send message to next hop
246 send( message, message->getDestinationNode() );
247 }
248}
249
250/// sends a message to another node, delivers it to the base overlay class
251seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
252 LinkDescriptor* next_link = NULL;
253
254 // drop messages to unspecified destinations
255 if (destination.isUnspecified()) return -1;
256
257 // send messages to myself -> handle message and drop warning!
258 if (destination == nodeId) {
259 logging_warn("Sent message to myself. Handling message.")
260 Message msg;
261 msg.encapsulate(message);
262 handleMessage( &msg, NULL );
263 return -1;
264 }
265
266 // relay path known? yes-> send over relay link
267 next_link = getRelayLinkTo( destination );
268 if (next_link != NULL) {
269 next_link->setRelaying();
270 return send(message, next_link);
271 }
272
273 // no-> relay path! route over overlay path
274 LinkID next_id = overlayInterface->getNextLinkId( destination );
275 if (next_id.isUnspecified()) {
276 logging_error("Could not send message. No next hop found to " << destination );
277 return -1;
278 }
279
280 // get link descriptor, up and running? yes-> send message
281 next_link = getDescriptor(next_id);
282 if (next_link != NULL && next_link->up)
283 return send(message, next_link);
284
285 // no-> error, dropping message
286 else {
287 logging_error("Could not send message. Link not known or up");
288 return -1;
289 }
290
291 // not reached-> fail
292 return -1;
293}
294
295/// send a message using a link descriptor, delivers it to the base overlay class
296seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ld, bool ignore_down ) {
297 assert(ld!=NULL);
298
299 // check if up
300 if (!ld->up && !ignore_down) {
301 logging_error("Can not send message. Link not up:" << ld );
302 return -1;
303 }
304
305 // handle relayed link
306 if (ld->relayed) {
307 logging_debug("send(): Resolving direct link for relayed link to "
308 << ld->remoteNode);
309 ld = getRelayLinkTo( ld->remoteNode );
310 if (ld==NULL) {
311 logging_error("Direct link not found.");
312 return -1;
313 }
314 message->setRelayed();
315 ld->setRelaying();
316 }
317
318 // handle direct link
319 if (ld->communicationUp) {
320 logging_debug("send(): Sending message over direct link.");
321 return bc->sendMessage( ld->communicationId, message );
322 } else {
323 logging_error("send(): Could not send mesage. "
324 "Not a relayed link and direct link is not up.");
325 return -1;
326 }
327 return -1;
328}
329
330seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
331 const ServiceID& service) {
332 message->setSourceNode(nodeId);
333 message->setService(service);
334 message->setDestinationNode(remote);
335 send( message, remote );
336}
337
338seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
339 LinkDescriptor* ld = getDescriptor(link);
340 if (ld==NULL) {
341 logging_error("Cannot find descriptor to link id=" << link.toString());
342 return -1;
343 }
344 message->setSourceNode(nodeId);
345 message->setSourceLink(ld->overlayId);
346 message->setService(ld->service);
347 message->setDestinationNode(ld->remoteNode);
348 message->setDestinationLink(ld->remoteLink);
349 return send( message, ld, ignore_down );
350}
351
352// relay route management ------------------------------------------------------
353
354/// stabilize relay information
355void BaseOverlay::stabilizeRelays() {
356 vector<relay_route>::iterator i = relay_routes.begin();
357 while (i!=relay_routes.end() ) {
358 relay_route& route = *i;
359 LinkDescriptor* ld = getDescriptor(route.link);
360 if (ld==NULL
361 || !ld->up
362 || difftime(route.used, time(NULL)) > 4) {
363 logging_info("Forgetting relay information to node "
364 << route.node.toString() );
365 i = relay_routes.erase(i);
366 } else
367 i++;
368 }
369}
370
371void BaseOverlay::removeRelayLink( const LinkID& link ) {
372 vector<relay_route>::iterator i = relay_routes.begin();
373 while (i!=relay_routes.end() ) {
374 relay_route& route = *i;
375 if (route.link == link ) i = relay_routes.erase(i); else i++;
376 }
377}
378
379void BaseOverlay::removeRelayNode( const NodeID& remote ) {
380 vector<relay_route>::iterator i = relay_routes.begin();
381 while (i!=relay_routes.end() ) {
382 relay_route& route = *i;
383 if (route.node == remote ) i = relay_routes.erase(i); else i++;
384 }
385}
386
387/// refreshes relay information
388void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
389
390 // handle relayed messages from real links only
391 if (ld == NULL
392 || ld->relayed
393 || !message->isRelayed()
394// || !(message->getService() == OverlayInterface::OVERLAY_SERVICE_ID)
395 || message->getSourceNode()==nodeId ) return;
396
397 // check wheter this node is already part of the routing table
398 LinkID next_link = overlayInterface->getNextLinkId(message->getSourceNode());
399 if (next_link == ld->overlayId) return;
400 ld->setRelaying();
401
402 // try to find source node
403 BOOST_FOREACH( relay_route& route, relay_routes ) {
404
405 // relay route found? yes->
406 if ( route.node == message->getSourceNode() ) {
407
408 // refresh timer
409 route.used = time(NULL);
410
411 // route has a shorter hop count? yes-> replace
412 if (route.hops > message->getNumHops() ) {
413 logging_info("Updating relay information to node "
414 << route.node.toString()
415 << " reducing to " << message->getNumHops() << " hops.");
416 route.hops = message->getNumHops();
417 route.link = ld->overlayId;
418 }
419 return;
420 }
421 }
422
423 // not found-> add new entry
424 relay_route route;
425 route.hops = message->getNumHops();
426 route.link = ld->overlayId;
427 route.node = message->getSourceNode();
428 route.used = time(NULL);
429 logging_info("Remembering relay information to node " << route.node.toString());
430 relay_routes.push_back(route);
431}
432
433/// returns a known "vital" relay link which is up and running
434LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
435 // try to find source node
436 BOOST_FOREACH( relay_route& route, relay_routes ) {
437 if (route.node == remote ) {
438 LinkDescriptor* ld = getDescriptor( route.link );
439 if (ld==NULL || !ld->up) return NULL; else return ld;
440 }
441 }
442 return NULL;
443}
444
445/* *****************************************************************************
446 * PUBLIC MEMBERS
447 * ****************************************************************************/
448
449use_logging_cpp(BaseOverlay);
450
451// ----------------------------------------------------------------------------
452
453BaseOverlay::BaseOverlay() :
454 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
455 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
456 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
457}
458
459BaseOverlay::~BaseOverlay() {
460}
461
462// ----------------------------------------------------------------------------
463
464void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
465 logging_info("Starting...");
466
467 // set parameters
468 bc = &_basecomm;
469 nodeId = _nodeid;
470
471 // register at base communication
472 bc->registerMessageReceiver( this );
473 bc->registerEventListener( this );
474
475 // timer for auto link management
476 Timer::setInterval( 1000 );
477 Timer::start();
478
479 started = true;
480 state = BaseOverlayStateInvalid;
481}
482
483void BaseOverlay::stop() {
484 logging_info("Stopping...");
485
486 // stop timer
487 Timer::stop();
488
489 // delete oberlay interface
490 if(overlayInterface != NULL) {
491 delete overlayInterface;
492 overlayInterface = NULL;
493 }
494
495 // unregister at base communication
496 bc->unregisterMessageReceiver( this );
497 bc->unregisterEventListener( this );
498
499 started = false;
500 state = BaseOverlayStateInvalid;
501}
502
503bool BaseOverlay::isStarted(){
504 return started;
505}
506
507// ----------------------------------------------------------------------------
508
509void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
510 const EndpointDescriptor& bootstrapEp) {
511
512 if(id != spovnetId){
513 logging_error("attempt to join against invalid spovnet, call initiate first");
514 return;
515 }
516
517
518 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
519 logging_info( "Starting to join spovnet " << id.toString() <<
520 " with nodeid " << nodeId.toString());
521
522 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
523
524 // bootstrap against ourselfs
525 logging_debug("joining spovnet locally");
526
527 overlayInterface->joinOverlay();
528 state = BaseOverlayStateCompleted;
529 BOOST_FOREACH( NodeListener* i, nodeListeners )
530 i->onJoinCompleted( spovnetId );
531
532 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
533 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
534
535 logging_debug("starting overlay bootstrap module");
536 overlayBootstrap.start(this, spovnetId, nodeId);
537 overlayBootstrap.publish(bc->getEndpointDescriptor());
538
539 } else {
540
541 // bootstrap against another node
542 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
543
544 const LinkID& lnk = bc->establishLink( bootstrapEp );
545 bootstrapLinks.push_back(lnk);
546 logging_info("join process initiated for " << id.toString() << "...");
547 }
548}
549
550void BaseOverlay::leaveSpoVNet() {
551
552 logging_info( "Leaving spovnet " << spovnetId );
553 bool ret = ( state != this->BaseOverlayStateInvalid );
554
555 logging_debug("stopping overlay bootstrap module");
556 overlayBootstrap.stop();
557 overlayBootstrap.revoke();
558
559 logging_debug( "Dropping all auto-links" );
560
561 // gather all service links
562 vector<LinkID> servicelinks;
563 BOOST_FOREACH( LinkDescriptor* ld, links ) {
564 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
565 servicelinks.push_back( ld->overlayId );
566 }
567
568 // drop all service links
569 BOOST_FOREACH( LinkID lnk, servicelinks )
570 dropLink( lnk );
571
572 // let the node leave the spovnet overlay interface
573 logging_debug( "Leaving overlay" );
574 if( overlayInterface != NULL )
575 overlayInterface->leaveOverlay();
576
577 // drop still open bootstrap links
578 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
579 bc->dropLink( lnk );
580
581 // change to inalid state
582 state = BaseOverlayStateInvalid;
583 //ovl.visShutdown( ovlId, nodeId, string("") );
584
585 // inform all registered services of the event
586 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
587 if( ret ) i->onLeaveCompleted( spovnetId );
588 else i->onLeaveFailed( spovnetId );
589 }
590}
591
592void BaseOverlay::createSpoVNet(const SpoVNetID& id,
593 const OverlayParameterSet& param,
594 const SecurityParameterSet& sec,
595 const QoSParameterSet& qos) {
596
597 // set the state that we are an initiator, this way incoming messages are
598 // handled correctly
599 logging_info( "creating spovnet " + id.toString() <<
600 " with nodeid " << nodeId.toString() );
601
602 spovnetId = id;
603
604 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
605 if( overlayInterface == NULL ) {
606 logging_fatal( "overlay structure not supported" );
607 state = BaseOverlayStateInvalid;
608
609 BOOST_FOREACH( NodeListener* i, nodeListeners )
610 i->onJoinFailed( spovnetId );
611
612 return;
613 }
614}
615
616// ----------------------------------------------------------------------------
617
618const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
619 const NodeID& remoteId, const ServiceID& service ) {
620
621 // establish link via overlay
622 if (!remoteId.isUnspecified())
623 return establishLink( remoteId, service );
624 else
625
626 // establish link directly if only ep is known
627 if (remoteId.isUnspecified())
628 return establishDirectLink(remoteEp, service );
629
630}
631
632/// call base communication's establish link and add link mapping
633const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
634 const ServiceID& service ) {
635
636 /// find a service listener
637 if( !communicationListeners.contains( service ) ) {
638 logging_error( "No listener registered for service id=" << service.toString() );
639 return LinkID::UNSPECIFIED;
640 }
641 CommunicationListener* listener = communicationListeners.get( service );
642 assert( listener != NULL );
643
644 // create descriptor
645 LinkDescriptor* ld = addDescriptor();
646 ld->relayed = false;
647 ld->listener = listener;
648 ld->service = service;
649 ld->communicationId = bc->establishLink( ep );
650
651 /// establish link and add mapping
652 logging_info("Establishing direct link " << ld->communicationId.toString()
653 << " using " << ep.toString());
654
655 return ld->communicationId;
656}
657
658/// establishes a link between two arbitrary nodes
659const LinkID BaseOverlay::establishLink( const NodeID& remote,
660 const ServiceID& service ) {
661
662 // do not establish a link to myself!
663 if (remote == nodeId) return LinkID::UNSPECIFIED;
664
665 // create a link descriptor
666 LinkDescriptor* ld = addDescriptor();
667 ld->relayed = true;
668 ld->remoteNode = remote;
669 ld->service = service;
670 ld->listener = getListener(ld->service);
671
672 // create link request message
673 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
674 msg.setSourceLink(ld->overlayId);
675 msg.setRelayed(true);
676
677 // debug message
678 logging_info(
679 "Sending link request with"
680 << " link=" << ld->overlayId.toString()
681 << " node=" << ld->remoteNode.toString()
682 << " serv=" << ld->service.toString()
683 );
684
685 // sending message to node
686 send_node( &msg, ld->remoteNode, ld->service );
687
688 return ld->overlayId;
689}
690
691/// drops an established link
692void BaseOverlay::dropLink(const LinkID& link) {
693 logging_info( "Dropping link (initiated locally):" << link.toString() );
694
695 // find the link item to drop
696 LinkDescriptor* ld = getDescriptor(link);
697 if( ld == NULL ) {
698 logging_warn( "Can't drop link, link is unknown!");
699 return;
700 }
701
702 // delete all queued messages
703 if( ld->messageQueue.size() > 0 ) {
704 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
705 << ld->messageQueue.size() << " waiting messages" );
706 ld->flushQueue();
707 }
708
709 // inform sideport and listener
710 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
711 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
712
713 // do not drop relay links
714 if (!ld->relaying) {
715 // drop the link in base communication
716 if (ld->communicationUp) bc->dropLink( ld->communicationId );
717
718 // erase descriptor
719 eraseDescriptor( ld->overlayId );
720 } else {
721 ld->dropAfterRelaying = true;
722 }
723}
724
725// ----------------------------------------------------------------------------
726
727/// internal send message, always use this functions to send messages over links
728seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
729 logging_debug( "Sending data message on link " << link.toString() );
730
731 // get the mapping for this link
732 LinkDescriptor* ld = getDescriptor(link);
733 if( ld == NULL ) {
734 logging_error("Could not send message. "
735 << "Link not found id=" << link.toString());
736 return -1;
737 }
738
739 // check if the link is up yet, if its an auto link queue message
740 if( !ld->up ) {
741 ld->setAutoUsed();
742 if( ld->autolink ) {
743 logging_info("Auto-link " << link.toString() << " not up, queue message");
744 Data data = data_serialize( message );
745 const_cast<Message*>(message)->dropPayload();
746 ld->messageQueue.push_back( new Message(data) );
747 } else {
748 logging_error("Link " << link.toString() << " not up, drop message");
749 }
750 return -1;
751 }
752
753 // compile overlay message (has service and node id)
754 OverlayMsg overmsg( OverlayMsg::typeData );
755 overmsg.encapsulate( const_cast<Message*>(message) );
756
757 // send message over relay/direct/overlay
758 return send_link( &overmsg, ld->overlayId );
759}
760
761seqnum_t BaseOverlay::sendMessage(const Message* message,
762 const NodeID& node, const ServiceID& service) {
763
764 // find link for node and service
765 LinkDescriptor* ld = getAutoDescriptor( node, service );
766
767 // if we found no link, create an auto link
768 if( ld == NULL ) {
769
770 // debug output
771 logging_info( "No link to send message to node "
772 << node.toString() << " found for service "
773 << service.toString() << ". Creating auto link ..."
774 );
775
776 // call base overlay to create a link
777 LinkID link = establishLink( node, service );
778 ld = getDescriptor( link );
779 if( ld == NULL ) {
780 logging_error( "Failed to establish auto-link.");
781 return -1;
782 }
783 ld->autolink = true;
784
785 logging_debug( "Auto-link establishment in progress to node "
786 << node.toString() << " with link id=" << link.toString() );
787 }
788 assert(ld != NULL);
789
790 // mark the link as used, as we now send a message through it
791 ld->setAutoUsed();
792
793 // send / queue message
794 return sendMessage( message, ld->overlayId );
795}
796
797// ----------------------------------------------------------------------------
798
799const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
800 const LinkID& link) const {
801
802 // return own end-point descriptor
803 if( link == LinkID::UNSPECIFIED )
804 return bc->getEndpointDescriptor();
805
806 // find link descriptor. not found -> return unspecified
807 const LinkDescriptor* ld = getDescriptor(link);
808 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
809
810 // return endpoint-descriptor from base communication
811 return bc->getEndpointDescriptor( ld->communicationId );
812}
813
814const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
815 const NodeID& node) const {
816
817 // return own end-point descriptor
818 if( node == nodeId || node == NodeID::UNSPECIFIED )
819 return bc->getEndpointDescriptor();
820
821 // no joined and request remote descriptor? -> fail!
822 if( overlayInterface == NULL ) {
823 logging_error( "overlay interface not set, cannot resolve endpoint" );
824 return EndpointDescriptor::UNSPECIFIED();
825 }
826
827 // resolve end-point descriptor from the base-overlay routing table
828 return overlayInterface->resolveNode( node );
829}
830
831// ----------------------------------------------------------------------------
832
833bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
834 sideport = _sideport;
835 _sideport->configure( this );
836}
837
838bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
839 sideport = &SideportListener::DEFAULT;
840}
841
842// ----------------------------------------------------------------------------
843
844bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
845 logging_debug( "binding communication listener " << listener
846 << " on serviceid " << sid.toString() );
847
848 if( communicationListeners.contains( sid ) ) {
849 logging_error( "some listener already registered for service id "
850 << sid.toString() );
851 return false;
852 }
853
854 communicationListeners.registerItem( listener, sid );
855 return true;
856}
857
858
859bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
860 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
861
862 if( !communicationListeners.contains( sid ) ) {
863 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
864 return false;
865 }
866
867 if( communicationListeners.get(sid) != listener ) {
868 logging_warn( "listener bound to service id " << sid.toString()
869 << " is different than listener trying to unbind" );
870 return false;
871 }
872
873 communicationListeners.unregisterItem( sid );
874 return true;
875}
876
877// ----------------------------------------------------------------------------
878
879bool BaseOverlay::bind(NodeListener* listener) {
880 logging_debug( "Binding node listener " << listener );
881
882 // already bound? yes-> warning
883 NodeListenerVector::iterator i =
884 find( nodeListeners.begin(), nodeListeners.end(), listener );
885 if( i != nodeListeners.end() ) {
886 logging_warn("Node listener " << listener << " is already bound!" );
887 return false;
888 }
889
890 // no-> add
891 nodeListeners.push_back( listener );
892 return true;
893}
894
895bool BaseOverlay::unbind(NodeListener* listener) {
896 logging_debug( "Unbinding node listener " << listener );
897
898 // already unbound? yes-> warning
899 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
900 if( i == nodeListeners.end() ) {
901 logging_warn( "Node listener " << listener << " is not bound!" );
902 return false;
903 }
904
905 // no-> remove
906 nodeListeners.erase( i );
907 return true;
908}
909
910// ----------------------------------------------------------------------------
911
912void BaseOverlay::onLinkUp(const LinkID& id,
913 const address_v* local, const address_v* remote) {
914 logging_debug( "Link up with base communication link id=" << id );
915
916 // get descriptor for link
917 LinkDescriptor* ld = getDescriptor(id, true);
918
919 // handle bootstrap link we initiated
920 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
921 logging_info(
922 "Join has been initiated by me and the link is now up. " <<
923 "Sending out join request for SpoVNet " << spovnetId.toString()
924 );
925
926 // send join request message
927 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
928 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
929 JoinRequest joinRequest( spovnetId, nodeId );
930 overlayMsg.encapsulate( &joinRequest );
931 bc->sendMessage( id, &overlayMsg );
932 return;
933 }
934
935 // no link found? -> link establishment from remote, add one!
936 if (ld == NULL) {
937 ld = addDescriptor( id );
938 logging_info( "onLinkUp (remote request) descriptor: " << ld );
939
940 // update descriptor
941 ld->fromRemote = true;
942 ld->communicationId = id;
943 ld->communicationUp = true;
944 ld->setAutoUsed();
945 ld->setAlive();
946
947 // in this case, do not inform listener, since service it unknown
948 // -> wait for update message!
949
950 // link mapping found? -> send update message with node-id and service id
951 } else {
952 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
953
954 // update descriptor
955 ld->setAutoUsed();
956 ld->setAlive();
957 ld->communicationUp = true;
958 ld->fromRemote = false;
959
960 // if link is a relayed link->convert to direct link
961 if (ld->relayed) {
962 logging_info( "Converting to direct link: " << ld );
963 ld->up = true;
964 ld->relayed = false;
965 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
966 overMsg.setSourceLink( ld->overlayId );
967 overMsg.setDestinationLink( ld->remoteLink );
968 send_link( &overMsg, ld->overlayId );
969 } else {
970 // note: necessary to validate the link on the remote side!
971 logging_info( "Sending out update" <<
972 " for service " << ld->service.toString() <<
973 " with local node id " << nodeId.toString() <<
974 " on link " << ld->overlayId.toString() );
975
976 // compile and send update message
977 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
978 overlayMsg.setSourceLink(ld->overlayId);
979 overlayMsg.setAutoLink( ld->autolink );
980 send_link( &overlayMsg, ld->overlayId, true );
981 }
982 }
983}
984
985void BaseOverlay::onLinkDown(const LinkID& id,
986 const address_v* local, const address_v* remote) {
987
988 // erase bootstrap links
989 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
990 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
991
992 // get descriptor for link
993 LinkDescriptor* ld = getDescriptor(id, true);
994 if ( ld == NULL ) return; // not found? ->ignore!
995 logging_info( "onLinkDown descriptor: " << ld );
996
997 // removing relay link information
998 removeRelayLink(ld->overlayId);
999
1000 // inform listeners about link down
1001 ld->communicationUp = false;
1002 if (!ld->service.isUnspecified()) {
1003 getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode );
1004 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1005 }
1006
1007 // delete all queued messages (auto links)
1008 if( ld->messageQueue.size() > 0 ) {
1009 logging_warn( "Dropping link " << id.toString() << " that has "
1010 << ld->messageQueue.size() << " waiting messages" );
1011 ld->flushQueue();
1012 }
1013
1014 // erase mapping
1015 eraseDescriptor(ld->overlayId);
1016}
1017
1018void BaseOverlay::onLinkChanged(const LinkID& id,
1019 const address_v* oldlocal, const address_v* newlocal,
1020 const address_v* oldremote, const address_v* newremote) {
1021
1022 // get descriptor for link
1023 LinkDescriptor* ld = getDescriptor(id, true);
1024 if ( ld == NULL ) return; // not found? ->ignore!
1025 logging_debug( "onLinkChanged descriptor: " << ld );
1026
1027 // inform listeners
1028 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1029 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1030
1031 // autolinks: refresh timestamp
1032 ld->setAutoUsed();
1033}
1034
1035void BaseOverlay::onLinkFail(const LinkID& id,
1036 const address_v* local, const address_v* remote) {
1037 logging_debug( "Link fail with base communication link id=" << id );
1038
1039 // erase bootstrap links
1040 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1041 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1042
1043 // get descriptor for link
1044 LinkDescriptor* ld = getDescriptor(id, true);
1045 if ( ld == NULL ) return; // not found? ->ignore!
1046 logging_debug( "Link failed id=" << ld->overlayId.toString() );
1047
1048 // inform listeners
1049 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1050 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1051}
1052
1053void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1054 const address_v* remote, const QoSParameterSet& qos) {
1055 logging_debug( "Link quality changed with base communication link id=" << id );
1056
1057 // get descriptor for link
1058 LinkDescriptor* ld = getDescriptor(id, true);
1059 if ( ld == NULL ) return; // not found? ->ignore!
1060 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1061}
1062
1063bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1064 const address_v* remote ) {
1065 logging_debug("Accepting link request from " << remote->to_string() );
1066 return true;
1067}
1068
1069/// handles a message from base communication
1070bool BaseOverlay::receiveMessage(const Message* message,
1071 const LinkID& link, const NodeID& ) {
1072 // get descriptor for link
1073 LinkDescriptor* ld = getDescriptor( link, true );
1074 return handleMessage( message, ld, link );
1075}
1076
1077// ----------------------------------------------------------------------------
1078
1079/// Handle spovnet instance join requests
1080bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1081
1082 // decapsulate message
1083 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1084 logging_info( "Received join request for spovnet " <<
1085 joinReq->getSpoVNetID().toString() );
1086
1087 // check spovnet id
1088 if( joinReq->getSpoVNetID() != spovnetId ) {
1089 logging_error(
1090 "Received join request for spovnet we don't handle " <<
1091 joinReq->getSpoVNetID().toString() );
1092 return false;
1093 }
1094
1095 // TODO: here you can implement mechanisms to deny joining of a node
1096 bool allow = true;
1097 logging_info( "Sending join reply for spovnet " <<
1098 spovnetId.toString() << " to node " <<
1099 overlayMsg->getSourceNode().toString() <<
1100 ". Result: " << (allow ? "allowed" : "denied") );
1101 joiningNodes.push_back( overlayMsg->getSourceNode() );
1102
1103 // return overlay parameters
1104 assert( overlayInterface != NULL );
1105 logging_debug( "Using bootstrap end-point "
1106 << getEndpointDescriptor().toString() )
1107 OverlayParameterSet parameters = overlayInterface->getParameters();
1108 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1109 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1110 JoinReply replyMsg( spovnetId, parameters,
1111 allow, getEndpointDescriptor() );
1112 retmsg.encapsulate(&replyMsg);
1113 bc->sendMessage( bcLink, &retmsg );
1114
1115 return true;
1116}
1117
1118/// Handle replies to spovnet instance join requests
1119bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1120 // decapsulate message
1121 logging_debug("received join reply message");
1122 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1123
1124 // correct spovnet?
1125 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1126 logging_error( "Received SpoVNet join reply for " <<
1127 replyMsg->getSpoVNetID().toString() <<
1128 " != " << spovnetId.toString() );
1129 delete replyMsg;
1130 return false;
1131 }
1132
1133 // access granted? no -> fail
1134 if( !replyMsg->getJoinAllowed() ) {
1135 logging_error( "Our join request has been denied" );
1136
1137 // drop initiator link
1138 if( !bcLink.isUnspecified() ){
1139 bc->dropLink( bcLink );
1140
1141 vector<LinkID>::iterator it = std::find(
1142 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1143 if( it != bootstrapLinks.end() )
1144 bootstrapLinks.erase(it);
1145 }
1146
1147 // inform all registered services of the event
1148 BOOST_FOREACH( NodeListener* i, nodeListeners )
1149 i->onJoinFailed( spovnetId );
1150
1151 delete replyMsg;
1152 return true;
1153 }
1154
1155 // access has been granted -> continue!
1156 logging_info("Join request has been accepted for spovnet " <<
1157 spovnetId.toString() );
1158
1159 logging_debug( "Using bootstrap end-point "
1160 << replyMsg->getBootstrapEndpoint().toString() );
1161
1162 // create overlay structure from spovnet parameter set
1163 // if we have not boostrapped yet against some other node
1164 if( overlayInterface == NULL ){
1165
1166 logging_debug("first-time bootstrapping");
1167
1168 overlayInterface = OverlayFactory::create(
1169 *this, replyMsg->getParam(), nodeId, this );
1170
1171 // overlay structure supported? no-> fail!
1172 if( overlayInterface == NULL ) {
1173 logging_error( "overlay structure not supported" );
1174
1175 if( !bcLink.isUnspecified() ){
1176 bc->dropLink( bcLink );
1177
1178 vector<LinkID>::iterator it = std::find(
1179 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1180 if( it != bootstrapLinks.end() )
1181 bootstrapLinks.erase(it);
1182 }
1183
1184 // inform all registered services of the event
1185 BOOST_FOREACH( NodeListener* i, nodeListeners )
1186 i->onJoinFailed( spovnetId );
1187
1188 delete replyMsg;
1189 return true;
1190 }
1191
1192 // everything ok-> join the overlay!
1193 state = BaseOverlayStateCompleted;
1194 overlayInterface->createOverlay();
1195
1196 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1197 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1198
1199 // update ovlvis
1200 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1201
1202 // inform all registered services of the event
1203 BOOST_FOREACH( NodeListener* i, nodeListeners )
1204 i->onJoinCompleted( spovnetId );
1205
1206 delete replyMsg;
1207
1208 } else {
1209
1210 // this is not the first bootstrap, just join the additional node
1211 logging_debug("not first-time bootstrapping");
1212 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1213 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1214
1215 delete replyMsg;
1216
1217 } // if( overlayInterface == NULL )
1218
1219 return true;
1220}
1221
1222
1223bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1224 // get service
1225 const ServiceID& service = overlayMsg->getService();
1226 logging_debug( "Received data for service " << service.toString()
1227 << " on link " << overlayMsg->getDestinationLink().toString() );
1228
1229 // delegate data message
1230 getListener(service)->onMessage(
1231 overlayMsg,
1232 overlayMsg->getSourceNode(),
1233 overlayMsg->getDestinationLink()
1234 );
1235
1236 return true;
1237}
1238
1239
1240bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1241
1242 if( ld == NULL ) {
1243 logging_warn( "received overlay update message for link for "
1244 << "which we have no mapping" );
1245 return false;
1246 }
1247 logging_info("Received type update message on link " << ld );
1248
1249 // update our link mapping information for this link
1250 bool changed =
1251 ( ld->remoteNode != overlayMsg->getSourceNode() )
1252 || ( ld->service != overlayMsg->getService() );
1253
1254 // set parameters
1255 ld->up = true;
1256 ld->remoteNode = overlayMsg->getSourceNode();
1257 ld->remoteLink = overlayMsg->getSourceLink();
1258 ld->service = overlayMsg->getService();
1259 ld->autolink = overlayMsg->isAutoLink();
1260
1261 // if our link information changed, we send out an update, too
1262 if( changed ) {
1263 overlayMsg->swapRoles();
1264 overlayMsg->setSourceNode(nodeId);
1265 overlayMsg->setSourceLink(ld->overlayId);
1266 overlayMsg->setService(ld->service);
1267 send( overlayMsg, ld );
1268 }
1269
1270 // service registered? no-> error!
1271 if( !communicationListeners.contains( ld->service ) ) {
1272 logging_warn( "Link up: event listener has not been registered" );
1273 return false;
1274 }
1275
1276 // default or no service registered?
1277 CommunicationListener* listener = communicationListeners.get( ld->service );
1278 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1279 logging_warn("Link up: event listener is default or null!" );
1280 return true;
1281 }
1282
1283 // update descriptor
1284 ld->listener = listener;
1285 ld->setAutoUsed();
1286 ld->setAlive();
1287
1288 // ask the service whether it wants to accept this link
1289 if( !listener->onLinkRequest(ld->remoteNode) ) {
1290
1291 logging_debug("Link id=" << ld->overlayId.toString() <<
1292 " has been denied by service " << ld->service.toString() << ", dropping link");
1293
1294 // prevent onLinkDown calls to the service
1295 ld->listener = &CommunicationListener::DEFAULT;
1296
1297 // drop the link
1298 dropLink( ld->overlayId );
1299 return true;
1300 }
1301
1302 // set link up
1303 ld->up = true;
1304 logging_info( "Link has been accepted by service and is up: " << ld );
1305
1306 // auto links: link has been accepted -> send queued messages
1307 if( ld->messageQueue.size() > 0 ) {
1308 logging_info( "Sending out queued messages on link " << ld );
1309 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1310 sendMessage( msg, ld->overlayId );
1311 delete msg;
1312 }
1313 ld->messageQueue.clear();
1314 }
1315
1316 // call the notification functions
1317 listener->onLinkUp( ld->overlayId, ld->remoteNode );
1318 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1319
1320 return true;
1321}
1322
1323/// handle a link request and reply
1324bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1325 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1326
1327 //TODO: Check if a request has already been sent using getSourceLink() ...
1328
1329 // create link descriptor
1330 LinkDescriptor* ldn = addDescriptor();
1331
1332 // flags
1333 ldn->up = true;
1334 ldn->fromRemote = true;
1335 ldn->relayed = true;
1336
1337 // parameters
1338 ldn->service = overlayMsg->getService();
1339 ldn->listener = getListener(ldn->service);
1340 ldn->remoteNode = overlayMsg->getSourceNode();
1341 ldn->remoteLink = overlayMsg->getSourceLink();
1342
1343 // update time-stamps
1344 ldn->setAlive();
1345 ldn->setAutoUsed();
1346
1347 // create reply message and send back!
1348 overlayMsg->swapRoles(); // swap source/destination
1349 overlayMsg->setType(OverlayMsg::typeLinkReply);
1350 overlayMsg->setSourceLink(ldn->overlayId);
1351 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1352 overlayMsg->setRelayed(true);
1353 send( overlayMsg, ld ); // send back to link
1354
1355 // inform listener
1356 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1357
1358 return true;
1359}
1360
1361bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1362
1363 // find link request
1364 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1365
1366 // not found? yes-> drop with error!
1367 if (ldn == NULL) {
1368 logging_error( "No link request pending for "
1369 << overlayMsg->getDestinationLink().toString() );
1370 return false;
1371 }
1372 logging_debug("Handling link reply for " << ldn )
1373
1374 // check if already up
1375 if (ldn->up) {
1376 logging_warn( "Link already up: " << ldn );
1377 return true;
1378 }
1379
1380 // debug message
1381 logging_debug( "Link request reply received. Establishing link"
1382 << " for service " << overlayMsg->getService().toString()
1383 << " with local id=" << overlayMsg->getDestinationLink()
1384 << " and remote link id=" << overlayMsg->getSourceLink()
1385 << " to " << overlayMsg->getSourceEndpoint().toString()
1386 );
1387
1388 // set local link descriptor data
1389 ldn->up = true;
1390 ldn->relayed = true;
1391 ldn->service = overlayMsg->getService();
1392 ldn->listener = getListener(ldn->service);
1393 ldn->remoteLink = overlayMsg->getSourceLink();
1394 ldn->remoteNode = overlayMsg->getSourceNode();
1395
1396 // update timestamps
1397 ldn->setAlive();
1398 ldn->setAutoUsed();
1399
1400 // auto links: link has been accepted -> send queued messages
1401 if( ldn->messageQueue.size() > 0 ) {
1402 logging_info( "Sending out queued messages on link " <<
1403 ldn->overlayId.toString() );
1404 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1405 sendMessage( msg, ldn->overlayId );
1406 delete msg;
1407 }
1408 ldn->messageQueue.clear();
1409 }
1410
1411 // inform listeners about new link
1412 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1413
1414 // try to replace relay link with direct link
1415 ldn->communicationId =
1416 bc->establishLink( overlayMsg->getSourceEndpoint() );
1417
1418 return true;
1419}
1420
1421/// handle a keep-alive message for a link
1422bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1423 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1424 if ( rld != NULL ) {
1425 logging_debug("Keep-Alive for " <<
1426 overlayMsg->getDestinationLink() );
1427 if (overlayMsg->isRouteRecord())
1428 rld->routeRecord = overlayMsg->getRouteRecord();
1429 rld->setAlive();
1430 return true;
1431 } else {
1432 logging_error("Keep-Alive for "
1433 << overlayMsg->getDestinationLink() << ": link unknown." );
1434 return false;
1435 }
1436}
1437
1438/// handle a direct link message
1439bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1440 logging_debug( "Received direct link replacement request" );
1441
1442 /// get destination overlay link
1443 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1444 if (rld == NULL || ld == NULL) {
1445 logging_error("Direct link replacement: Link "
1446 << overlayMsg->getDestinationLink() << "not found error." );
1447 return false;
1448 }
1449 logging_info( "Received direct link convert notification for " << rld );
1450
1451 // update information
1452 rld->communicationId = ld->communicationId;
1453 rld->communicationUp = true;
1454 rld->relayed = false;
1455
1456 // mark used and alive!
1457 rld->setAlive();
1458 rld->setAutoUsed();
1459
1460 // erase the original descriptor
1461 eraseDescriptor(ld->overlayId);
1462}
1463
1464/// handles an incoming message
1465bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1466 const LinkID bcLink ) {
1467 logging_debug( "Handling message: " << message->toString());
1468
1469 // decapsulate overlay message
1470 OverlayMsg* overlayMsg =
1471 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1472 if( overlayMsg == NULL ) return false;
1473
1474 // refresh relay information
1475 refreshRelayInformation( overlayMsg, ld );
1476
1477 // increase number of hops
1478 overlayMsg->increaseNumHops();
1479
1480 // update route record
1481 overlayMsg->addRouteRecord(nodeId);
1482
1483 // handle signaling messages (do not route!)
1484 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1485 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1486 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1487 delete overlayMsg;
1488 return true;
1489 }
1490
1491 // message for reached destination? no-> route message
1492 if (!overlayMsg->getDestinationNode().isUnspecified() &&
1493 overlayMsg->getDestinationNode() != nodeId ) {
1494 logging_debug("Routing message "
1495 << " from " << overlayMsg->getSourceNode()
1496 << " to " << overlayMsg->getDestinationNode()
1497 );
1498
1499 route( overlayMsg );
1500 delete overlayMsg;
1501 return true;
1502 }
1503
1504 // handle base overlay message
1505 bool ret = false; // return value
1506 switch ( overlayMsg->getType() ) {
1507
1508 // data transport messages
1509 case OverlayMsg::typeData:
1510 ret = handleData(overlayMsg, ld); break;
1511
1512 // overlay setup messages
1513 case OverlayMsg::typeJoinRequest:
1514 ret = handleJoinRequest(overlayMsg, bcLink ); break;
1515 case OverlayMsg::typeJoinReply:
1516 ret = handleJoinReply(overlayMsg, bcLink ); break;
1517
1518 // link specific messages
1519 case OverlayMsg::typeLinkRequest:
1520 ret = handleLinkRequest(overlayMsg, ld ); break;
1521 case OverlayMsg::typeLinkReply:
1522 ret = handleLinkReply(overlayMsg, ld ); break;
1523 case OverlayMsg::typeLinkUpdate:
1524 ret = handleLinkUpdate(overlayMsg, ld ); break;
1525 case OverlayMsg::typeLinkAlive:
1526 ret = handleLinkAlive(overlayMsg, ld ); break;
1527 case OverlayMsg::typeLinkDirect:
1528 ret = handleLinkDirect(overlayMsg, ld ); break;
1529
1530 // handle unknown message type
1531 default: {
1532 logging_error( "received message in invalid state! don't know " <<
1533 "what to do with this message of type " << overlayMsg->getType() );
1534 ret = false;
1535 break;
1536 }
1537 }
1538
1539 // free overlay message and return value
1540 delete overlayMsg;
1541 return ret;
1542}
1543
1544// ----------------------------------------------------------------------------
1545
1546void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1547
1548 logging_debug( "broadcasting message to all known nodes " <<
1549 "in the overlay from service " + service.toString() );
1550
1551 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1552 OverlayInterface::NodeList::iterator i = nodes.begin();
1553 for(; i != nodes.end(); i++ ) {
1554 if( *i == nodeId) continue; // don't send to ourselfs
1555 sendMessage( message, *i, service );
1556 }
1557}
1558
1559/// return the overlay neighbors
1560vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1561 // the known nodes _can_ also include our node, so we remove ourself
1562 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1563 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1564 if( i != nodes.end() ) nodes.erase( i );
1565 return nodes;
1566}
1567
1568const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1569 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1570 const LinkDescriptor* ld = getDescriptor(lid);
1571 if( ld == NULL ) return NodeID::UNSPECIFIED;
1572 else return ld->remoteNode;
1573}
1574
1575vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1576 vector<LinkID> linkvector;
1577 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1578 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1579 linkvector.push_back( ld->overlayId );
1580 }
1581 }
1582 return linkvector;
1583}
1584
1585
1586void BaseOverlay::onNodeJoin(const NodeID& node) {
1587 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1588 if( i == joiningNodes.end() ) return;
1589
1590 logging_info( "node has successfully joined baseoverlay and overlay structure "
1591 << node.toString() );
1592
1593 joiningNodes.erase( i );
1594}
1595
1596void BaseOverlay::eventFunction() {
1597 stabilizeRelays();
1598 stabilizeLinks();
1599}
1600
1601}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.