source: source/ariba/overlay/BaseOverlay.cpp@ 5870

Last change on this file since 5870 was 5870, checked in by Christoph Mayer, 15 years ago

merge noch nicht fertig

File size: 52.5 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/misc/OvlVis.h"
57
58namespace ariba {
59namespace overlay {
60
61/* *****************************************************************************
62 * PREREQUESITES
63 * ****************************************************************************/
64
65CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
66 if( !communicationListeners.contains( service ) ) {
67 logging_error( "No listener found for service " << service.toString() );
68 return NULL;
69 }
70 CommunicationListener* listener = communicationListeners.get( service );
71 assert( listener != NULL );
72 return listener;
73}
74
75// link descriptor handling ----------------------------------------------------
76
77LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
78 BOOST_FOREACH( LinkDescriptor* lp, links )
79 if ((communication ? lp->communicationId : lp->overlayId) == link)
80 return lp;
81 return NULL;
82}
83
84const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
85 BOOST_FOREACH( const LinkDescriptor* lp, links )
86 if ((communication ? lp->communicationId : lp->overlayId) == link)
87 return lp;
88 return NULL;
89}
90
91/// erases a link descriptor
92void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
93 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
94 LinkDescriptor* ld = *i;
95 if ((communication ? ld->communicationId : ld->overlayId) == link) {
96 delete ld;
97 links.erase(i);
98 break;
99 }
100 }
101}
102
103/// adds a link descriptor
104LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
105 LinkDescriptor* desc = getDescriptor( link );
106 if ( desc == NULL ) {
107 desc = new LinkDescriptor();
108 if (!link.isUnspecified()) desc->overlayId = link;
109 links.push_back(desc);
110 }
111 return desc;
112}
113
114/// returns a auto-link descriptor
115LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
116 // search for a descriptor that is already up
117 BOOST_FOREACH( LinkDescriptor* lp, links )
118 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up)
119 return lp;
120 // if not found, search for one that is about to come up...
121 BOOST_FOREACH( LinkDescriptor* lp, links )
122 if (lp->autolink && lp->remoteNode == node && lp->service == service )
123 return lp;
124 return NULL;
125}
126
127/// stabilizes link information
128void BaseOverlay::stabilizeLinks() {
129 // send keep-alive messages over established links
130 BOOST_FOREACH( LinkDescriptor* ld, links ) {
131 if (!ld->up) continue;
132 OverlayMsg msg( OverlayMsg::typeLinkAlive,
133 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
134 msg.setRelayed(true);
135 if (ld->relayed) msg.setRouteRecord(true);
136 send_link( &msg, ld->overlayId );
137 }
138
139<<<<<<< .working
140 // get used next hop towards node
141 LinkDescriptor* rld = NULL;
142 NodeID relayNode = NodeID::UNSPECIFIED;
143 LinkID rlid = overlayInterface->getNextLinkId(id);
144 if ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {
145 // get descriptor of first hop
146 rld = getDescriptor(rlid);
147=======
148 // iterate over all links and check for time boundaries
149 vector<LinkDescriptor*> oldlinks;
150 time_t now = time(NULL);
151 BOOST_FOREACH( LinkDescriptor* ld, links ) {
152 // remote used as relay flag
153 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
154 ld->relaying = false;
155>>>>>>> .merge-rechts.r5869
156
157<<<<<<< .working
158 // is first hop a relay path use local relay
159 if ( rld->relay ) relayNode = rld->localRelay;
160
161 // no-> a proper relay node has been found
162 else relayNode = rld->remoteNode;
163 }
164=======
165 // keep alives and not up? yes-> link connection request is stale!
166 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
167>>>>>>> .merge-rechts.r5869
168
169 // increase counter
170 ld->keepAliveMissed++;
171
172 // missed more than four keep-alive messages (10 sec)? -> drop link
173 if (ld->keepAliveMissed > 4) {
174 logging_info( "Link connection request is stale, closing: " << ld );
175 ld->relaying = false;
176 oldlinks.push_back( ld );
177 continue;
178 }
179 }
180
181 if (!ld->up) continue;
182
183<<<<<<< .working
184 // get descriptor
185 BOOST_FOREACH( LinkDescriptor* lp, links )
186 if (lp->remoteNode == relayNode &&
187 lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&
188 lp->relay == false &&
189 lp->up)
190 return lp;
191=======
192 // drop links that are dropped and not used as relay
193 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
194 oldlinks.push_back( ld );
195 continue;
196 }
197
198 // auto-link time exceeded?
199 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
200 oldlinks.push_back( ld );
201 continue;
202>>>>>>> .merge-rechts.r5869
203
204 // keep alives missed? yes->
205 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
206
207 // increase counter
208 ld->keepAliveMissed++;
209
210 // missed more than four keep-alive messages (4 sec)? -> drop link
211 if (ld->keepAliveMissed >= 4) {
212 logging_info( "Link is stale, closing: " << ld );
213 ld->relaying = false;
214 oldlinks.push_back( ld );
215 continue;
216 }
217 }
218 }
219
220 // drop links
221 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
222/*
223 vector<LinkID>::iterator it = std::find(
224 bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId);
225
226 if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){
227 logging_info( "Not dropping initiator link: " << ld );
228 continue;
229 }
230*/
231 logging_info( "Link timed out. Dropping " << ld );
232 dropLink( ld->overlayId );
233 }
234
235 // show link state
236 counter++;
237 if (counter>=4) showLinks();
238 if (counter>=4 || counter<0) counter = 0;
239}
240
241void BaseOverlay::showLinks() {
242 int i=0;
243 logging_info("--- link state -------------------------------");
244 BOOST_FOREACH( LinkDescriptor* ld, links ) {
245 logging_info("link " << i << ": " << ld);
246 i++;
247 }
248 logging_info("----------------------------------------------");
249}
250
251// internal message delivery ---------------------------------------------------
252
253/// routes a message to its destination node
254void BaseOverlay::route( OverlayMsg* message, LinkDescriptor* incomingLink ) {
255
256 // exceeded time-to-live? yes-> drop message
257 if (message->getNumHops() > message->getTimeToLive()) {
258 logging_warn("Message exceeded TTL -> drop message!");
259 return;
260
261 // no-> forward message
262 } else {
263 // destinastion myself? yes-> handle message
264 if (message->getDestinationNode() == nodeId)
265 handleMessage( message, incomingLink );
266 else
267 // no->send message to next hop
268 send( message, message->getDestinationNode() );
269 }
270}
271
272/// sends a message to another node, delivers it to the base overlay class
273seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
274 LinkDescriptor* next_link = NULL;
275
276 // drop messages to unspecified destinations
277 if (destination.isUnspecified()) return -1;
278
279 // send messages to myself -> handle message and drop warning!
280 if (destination == nodeId) {
281 logging_warn("Sent message to myself. Handling message.")
282 Message msg;
283 msg.encapsulate(message);
284 handleMessage( &msg, NULL );
285 return -1;
286 }
287
288 // relay path known? yes-> send over relay link
289 next_link = getRelayLinkTo( destination );
290 if (next_link != NULL) {
291 next_link->setRelaying();
292 return send(message, next_link);
293 }
294
295 // no-> relay path! route over overlay path
296 LinkID next_id = overlayInterface->getNextLinkId( destination );
297 if (next_id.isUnspecified()) {
298 logging_error("Could not send message. No next hop found to " << destination );
299 return -1;
300 }
301
302 // get link descriptor, up and running? yes-> send message
303 next_link = getDescriptor(next_id);
304 if (next_link != NULL && next_link->up)
305 return send(message, next_link);
306
307 // no-> error, dropping message
308 else {
309 logging_error("Could not send message. Link not known or up");
310 return -1;
311 }
312
313 // not reached-> fail
314 return -1;
315}
316
317/// send a message using a link descriptor, delivers it to the base overlay class
318seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ld, bool ignore_down ) {
319 assert(ld!=NULL);
320
321 // check if up
322 if (!ld->up && !ignore_down) {
323 logging_error("Can not send message. Link not up:" << ld );
324 return -1;
325 }
326
327 // handle relayed link
328 if (ld->relayed) {
329 logging_debug("send(): Resolving direct link for relayed link to "
330 << ld->remoteNode);
331 ld = getRelayLinkTo( ld->remoteNode );
332 if (ld==NULL) {
333 logging_error("Direct link not found.");
334 return -1;
335 }
336 message->setRelayed();
337 ld->setRelaying();
338 }
339
340 // handle direct link
341 if (ld->communicationUp) {
342 logging_debug("send(): Sending message over direct link.");
343 return bc->sendMessage( ld->communicationId, message );
344 } else {
345 logging_error("send(): Could not send mesage. "
346 "Not a relayed link and direct link is not up.");
347 return -1;
348 }
349 return -1;
350}
351
352seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
353 const ServiceID& service) {
354 message->setSourceNode(nodeId);
355 message->setService(service);
356 message->setDestinationNode(remote);
357 send( message, remote );
358}
359
360seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
361 LinkDescriptor* ld = getDescriptor(link);
362 if (ld==NULL) {
363 logging_error("Cannot find descriptor to link id=" << link.toString());
364 return -1;
365 }
366 message->setSourceNode(nodeId);
367 message->setSourceLink(ld->overlayId);
368 message->setService(ld->service);
369 message->setDestinationNode(ld->remoteNode);
370 message->setDestinationLink(ld->remoteLink);
371 return send( message, ld, ignore_down );
372}
373
374// relay route management ------------------------------------------------------
375
376/// stabilize relay information
377void BaseOverlay::stabilizeRelays() {
378 vector<relay_route>::iterator i = relay_routes.begin();
379 while (i!=relay_routes.end() ) {
380 relay_route& route = *i;
381 LinkDescriptor* ld = getDescriptor(route.link);
382 if (ld==NULL
383 || !ld->up
384 || difftime(route.used, time(NULL)) > 4) {
385 logging_info("Forgetting relay information to node "
386 << route.node.toString() );
387 i = relay_routes.erase(i);
388 } else
389 i++;
390 }
391}
392
393void BaseOverlay::removeRelayLink( const LinkID& link ) {
394 vector<relay_route>::iterator i = relay_routes.begin();
395 while (i!=relay_routes.end() ) {
396 relay_route& route = *i;
397 if (route.link == link ) i = relay_routes.erase(i); else i++;
398 }
399}
400
401void BaseOverlay::removeRelayNode( const NodeID& remote ) {
402 vector<relay_route>::iterator i = relay_routes.begin();
403 while (i!=relay_routes.end() ) {
404 relay_route& route = *i;
405 if (route.node == remote ) i = relay_routes.erase(i); else i++;
406 }
407}
408
409/// refreshes relay information
410void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
411
412 // handle relayed messages from real links only
413 if (ld == NULL
414 || ld->relayed
415 || !message->isRelayed()
416 || message->getSourceNode()==nodeId ) return;
417
418 // check wheter this node is already part of the routing table
419 LinkID next_link = overlayInterface->getNextLinkId(message->getSourceNode());
420 if (next_link == ld->overlayId) return;
421 ld->setRelaying();
422
423 // try to find source node
424 BOOST_FOREACH( relay_route& route, relay_routes ) {
425
426 // relay route found? yes->
427 if ( route.node == message->getSourceNode() ) {
428
429 // refresh timer
430 route.used = time(NULL);
431
432 // route has a shorter hop count? yes-> replace
433 if (route.hops > message->getNumHops() ) {
434 logging_info("Updating relay information to node "
435 << route.node.toString()
436 << " reducing to " << message->getNumHops() << " hops.");
437 route.hops = message->getNumHops();
438 route.link = ld->overlayId;
439 }
440 return;
441 }
442 }
443
444 // not found-> add new entry
445 relay_route route;
446 route.hops = message->getNumHops();
447 route.link = ld->overlayId;
448 route.node = message->getSourceNode();
449 route.used = time(NULL);
450 logging_info("Remembering relay information to node " << route.node.toString());
451 relay_routes.push_back(route);
452}
453
454/// returns a known "vital" relay link which is up and running
455LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
456 // try to find source node
457 BOOST_FOREACH( relay_route& route, relay_routes ) {
458 if (route.node == remote ) {
459 LinkDescriptor* ld = getDescriptor( route.link );
460 if (ld==NULL || !ld->up) return NULL; else return ld;
461 }
462 }
463 return NULL;
464}
465
466/* *****************************************************************************
467 * PUBLIC MEMBERS
468 * ****************************************************************************/
469
470use_logging_cpp(BaseOverlay);
471
472// ----------------------------------------------------------------------------
473
474BaseOverlay::BaseOverlay() :
475 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
476 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
477 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
478}
479
480BaseOverlay::~BaseOverlay() {
481}
482
483// ----------------------------------------------------------------------------
484
485void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
486 logging_info("Starting...");
487
488 // set parameters
489 bc = &_basecomm;
490 nodeId = _nodeid;
491
492 // register at base communication
493 bc->registerMessageReceiver( this );
494 bc->registerEventListener( this );
495
496 // timer for auto link management
497 Timer::setInterval( 1000 );
498 Timer::start();
499
500 started = true;
501 state = BaseOverlayStateInvalid;
502}
503
504void BaseOverlay::stop() {
505 logging_info("Stopping...");
506
507 // stop timer
508 Timer::stop();
509
510 // delete oberlay interface
511 if(overlayInterface != NULL) {
512 delete overlayInterface;
513 overlayInterface = NULL;
514 }
515
516 // unregister at base communication
517 bc->unregisterMessageReceiver( this );
518 bc->unregisterEventListener( this );
519
520 started = false;
521 state = BaseOverlayStateInvalid;
522}
523
524bool BaseOverlay::isStarted(){
525 return started;
526}
527
528// ----------------------------------------------------------------------------
529
530void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
531 const EndpointDescriptor& bootstrapEp) {
532
533 if(id != spovnetId){
534 logging_error("attempt to join against invalid spovnet, call initiate first");
535 return;
536 }
537
538 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
539 logging_info( "Starting to join spovnet " << id.toString() <<
540 " with nodeid " << nodeId.toString());
541
542 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
543
544 // bootstrap against ourselfs
545 logging_info("joining spovnet locally");
546
547 overlayInterface->joinOverlay();
548 state = BaseOverlayStateCompleted;
549 BOOST_FOREACH( NodeListener* i, nodeListeners )
550 i->onJoinCompleted( spovnetId );
551
552 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
553 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
554
555 logging_debug("starting overlay bootstrap module");
556 overlayBootstrap.start(this, spovnetId, nodeId);
557 overlayBootstrap.publish(bc->getEndpointDescriptor());
558
559 } else {
560
561 // bootstrap against another node
562 logging_info("joining spovnet remotely against " << bootstrapEp.toString());
563
564 const LinkID& lnk = bc->establishLink( bootstrapEp );
565 bootstrapLinks.push_back(lnk);
566
567 logging_info("join process initiated for " << id.toString() << "...");
568
569 }
570}
571
572void BaseOverlay::leaveSpoVNet() {
573
574 logging_info( "Leaving spovnet " << spovnetId );
575 bool ret = ( state != this->BaseOverlayStateInvalid );
576
577 logging_debug("stopping overlay bootstrap module");
578 overlayBootstrap.stop();
579 overlayBootstrap.revoke();
580
581 logging_debug( "Dropping all auto-links" );
582
583 // gather all service links
584 vector<LinkID> servicelinks;
585 BOOST_FOREACH( LinkDescriptor* ld, links ) {
586 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
587 servicelinks.push_back( ld->overlayId );
588 }
589
590 // drop all service links
591 BOOST_FOREACH( LinkID lnk, servicelinks )
592 dropLink( lnk );
593
594 // let the node leave the spovnet overlay interface
595 logging_debug( "Leaving overlay" );
596 if( overlayInterface != NULL )
597 overlayInterface->leaveOverlay();
598
599 // drop still open bootstrap links
600 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
601 bc->dropLink( lnk );
602
603 // change to inalid state
604 state = BaseOverlayStateInvalid;
605 //ovl.visShutdown( ovlId, nodeId, string("") );
606
607 // inform all registered services of the event
608 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
609 if( ret ) i->onLeaveCompleted( spovnetId );
610 else i->onLeaveFailed( spovnetId );
611 }
612}
613
614void BaseOverlay::createSpoVNet(const SpoVNetID& id,
615 const OverlayParameterSet& param,
616 const SecurityParameterSet& sec,
617 const QoSParameterSet& qos) {
618
619 // set the state that we are an initiator, this way incoming messages are
620 // handled correctly
621 logging_info( "creating spovnet " + id.toString() <<
622 " with nodeid " << nodeId.toString() );
623
624 spovnetId = id;
625
626 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
627 if( overlayInterface == NULL ) {
628 logging_fatal( "overlay structure not supported" );
629 state = BaseOverlayStateInvalid;
630
631 BOOST_FOREACH( NodeListener* i, nodeListeners )
632 i->onJoinFailed( spovnetId );
633
634 return;
635 }
636}
637
638// ----------------------------------------------------------------------------
639
640const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
641 const NodeID& remoteId, const ServiceID& service ) {
642
643 // establish link via overlay
644 if (!remoteId.isUnspecified())
645 return establishLink( remoteId, service );
646 else
647
648 // establish link directly if only ep is known
649 if (remoteId.isUnspecified())
650 return establishDirectLink(remoteEp, service );
651
652}
653
654/// call base communication's establish link and add link mapping
655const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
656 const ServiceID& service ) {
657
658 /// find a service listener
659 if( !communicationListeners.contains( service ) ) {
660 logging_error( "No listener registered for service id=" << service.toString() );
661 return LinkID::UNSPECIFIED;
662 }
663 CommunicationListener* listener = communicationListeners.get( service );
664 assert( listener != NULL );
665
666 // create descriptor
667 LinkDescriptor* ld = addDescriptor();
668 ld->relayed = false;
669 ld->listener = listener;
670 ld->service = service;
671 ld->communicationId = bc->establishLink( ep );
672
673 /// establish link and add mapping
674 logging_info("Establishing direct link " << ld->communicationId.toString()
675 << " using " << ep.toString());
676
677 return ld->communicationId;
678}
679
680/// establishes a link between two arbitrary nodes
681const LinkID BaseOverlay::establishLink( const NodeID& remote,
682 const ServiceID& service ) {
683
684 // do not establish a link to myself!
685 if (remote == nodeId) return LinkID::UNSPECIFIED;
686
687 // create a link descriptor
688 LinkDescriptor* ld = addDescriptor();
689 ld->relayed = true;
690 ld->remoteNode = remote;
691 ld->service = service;
692 ld->listener = getListener(ld->service);
693
694 // create link request message
695 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
696 msg.setSourceLink(ld->overlayId);
697 msg.setRelayed(true);
698
699 // debug message
700 logging_info(
701 "Sending link request with"
702 << " link=" << ld->overlayId.toString()
703 << " node=" << ld->remoteNode.toString()
704 << " serv=" << ld->service.toString()
705 );
706
707 // sending message to node
708 send_node( &msg, ld->remoteNode, ld->service );
709
710 return ld->overlayId;
711}
712
713/// drops an established link
714void BaseOverlay::dropLink(const LinkID& link) {
715 logging_info( "Dropping link (initiated locally):" << link.toString() );
716
717 // find the link item to drop
718 LinkDescriptor* ld = getDescriptor(link);
719 if( ld == NULL ) {
720 logging_warn( "Can't drop link, link is unknown!");
721 return;
722 }
723
724 // delete all queued messages
725 if( ld->messageQueue.size() > 0 ) {
726 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
727 << ld->messageQueue.size() << " waiting messages" );
728 ld->flushQueue();
729 }
730
731 // inform sideport and listener
732 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
733 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
734
735 // do not drop relay links
736 if (!ld->relaying) {
737 // drop the link in base communication
738 if (ld->communicationUp) bc->dropLink( ld->communicationId );
739
740 // erase descriptor
741 eraseDescriptor( ld->overlayId );
742 } else
743 ld->dropAfterRelaying = true;
744}
745
746// ----------------------------------------------------------------------------
747
748/// internal send message, always use this functions to send messages over links
749seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
750 logging_debug( "Sending data message on link " << link.toString() );
751
752 // get the mapping for this link
753 LinkDescriptor* ld = getDescriptor(link);
754 if( ld == NULL ) {
755 logging_error("Could not send message. "
756 << "Link not found id=" << link.toString());
757 return -1;
758 }
759
760 // check if the link is up yet, if its an auto link queue message
761 if( !ld->up ) {
762 ld->setAutoUsed();
763 if( ld->autolink ) {
764 logging_info("Auto-link " << link.toString() << " not up, queue message");
765 Data data = data_serialize( message );
766 const_cast<Message*>(message)->dropPayload();
767 ld->messageQueue.push_back( new Message(data) );
768 } else {
769 logging_error("Link " << link.toString() << " not up, drop message");
770 }
771 return -1;
772 }
773
774 // compile overlay message (has service and node id)
775 OverlayMsg overmsg( OverlayMsg::typeData );
776 overmsg.encapsulate( const_cast<Message*>(message) );
777
778 // send message over relay/direct/overlay
779 return send_link( &overmsg, ld->overlayId );
780}
781
782seqnum_t BaseOverlay::sendMessage(const Message* message,
783 const NodeID& node, const ServiceID& service) {
784
785 // find link for node and service
786 LinkDescriptor* ld = getAutoDescriptor( node, service );
787
788 // if we found no link, create an auto link
789 if( ld == NULL ) {
790
791 // debug output
792 logging_info( "No link to send message to node "
793 << node.toString() << " found for service "
794 << service.toString() << ". Creating auto link ..."
795 );
796
797 // call base overlay to create a link
798 LinkID link = establishLink( node, service );
799 ld = getDescriptor( link );
800 if( ld == NULL ) {
801 logging_error( "Failed to establish auto-link.");
802 return -1;
803 }
804 ld->autolink = true;
805
806 logging_debug( "Auto-link establishment in progress to node "
807 << node.toString() << " with link id=" << link.toString() );
808 }
809 assert(ld != NULL);
810
811 // mark the link as used, as we now send a message through it
812 ld->setAutoUsed();
813
814 // send / queue message
815 return sendMessage( message, ld->overlayId );
816}
817
818// ----------------------------------------------------------------------------
819
820const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
821 const LinkID& link) const {
822
823 // return own end-point descriptor
824 if( link == LinkID::UNSPECIFIED )
825 return bc->getEndpointDescriptor();
826
827 // find link descriptor. not found -> return unspecified
828 const LinkDescriptor* ld = getDescriptor(link);
829 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
830
831 // return endpoint-descriptor from base communication
832 return bc->getEndpointDescriptor( ld->communicationId );
833}
834
835const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
836 const NodeID& node) const {
837
838 // return own end-point descriptor
839 if( node == nodeId || node == NodeID::UNSPECIFIED )
840 return bc->getEndpointDescriptor();
841
842 // no joined and request remote descriptor? -> fail!
843 if( overlayInterface == NULL ) {
844 logging_error( "overlay interface not set, cannot resolve endpoint" );
845 return EndpointDescriptor::UNSPECIFIED();
846 }
847
848 // resolve end-point descriptor from the base-overlay routing table
849 return overlayInterface->resolveNode( node );
850}
851
852// ----------------------------------------------------------------------------
853
854bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
855 sideport = _sideport;
856 _sideport->configure( this );
857}
858
859bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
860 sideport = &SideportListener::DEFAULT;
861}
862
863// ----------------------------------------------------------------------------
864
865bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
866 logging_debug( "binding communication listener " << listener
867 << " on serviceid " << sid.toString() );
868
869 if( communicationListeners.contains( sid ) ) {
870 logging_error( "some listener already registered for service id "
871 << sid.toString() );
872 return false;
873 }
874
875 communicationListeners.registerItem( listener, sid );
876 return true;
877}
878
879
880bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
881 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
882
883 if( !communicationListeners.contains( sid ) ) {
884 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
885 return false;
886 }
887
888 if( communicationListeners.get(sid) != listener ) {
889 logging_warn( "listener bound to service id " << sid.toString()
890 << " is different than listener trying to unbind" );
891 return false;
892 }
893
894 communicationListeners.unregisterItem( sid );
895 return true;
896}
897
898// ----------------------------------------------------------------------------
899
900bool BaseOverlay::bind(NodeListener* listener) {
901 logging_debug( "Binding node listener " << listener );
902
903 // already bound? yes-> warning
904 NodeListenerVector::iterator i =
905 find( nodeListeners.begin(), nodeListeners.end(), listener );
906 if( i != nodeListeners.end() ) {
907 logging_warn("Node listener " << listener << " is already bound!" );
908 return false;
909 }
910
911 // no-> add
912 nodeListeners.push_back( listener );
913 return true;
914}
915
916bool BaseOverlay::unbind(NodeListener* listener) {
917 logging_debug( "Unbinding node listener " << listener );
918
919 // already unbound? yes-> warning
920 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
921 if( i == nodeListeners.end() ) {
922 logging_warn( "Node listener " << listener << " is not bound!" );
923 return false;
924 }
925
926 // no-> remove
927 nodeListeners.erase( i );
928 return true;
929}
930
931// ----------------------------------------------------------------------------
932
933void BaseOverlay::onLinkUp(const LinkID& id,
934 const address_v* local, const address_v* remote) {
935 logging_debug( "Link up with base communication link id=" << id );
936
937 // get descriptor for link
938 LinkDescriptor* ld = getDescriptor(id, true);
939
940 // handle bootstrap link we initiated
941 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
942 logging_info(
943 "Join has been initiated by me and the link is now up. " <<
944 "Sending out join request for SpoVNet " << spovnetId.toString()
945 );
946
947 // send join request message
948 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
949 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
950 JoinRequest joinRequest( spovnetId, nodeId );
951 overlayMsg.encapsulate( &joinRequest );
952 bc->sendMessage( id, &overlayMsg );
953 return;
954 }
955
956 // no link found? -> link establishment from remote, add one!
957 if (ld == NULL) {
958 ld = addDescriptor( id );
959 logging_info( "onLinkUp (remote request) descriptor: " << ld );
960
961 // update descriptor
962 ld->fromRemote = true;
963 ld->communicationId = id;
964 ld->communicationUp = true;
965 ld->setAutoUsed();
966 ld->setAlive();
967
968 // in this case, do not inform listener, since service it unknown
969 // -> wait for update message!
970
971 // link mapping found? -> send update message with node-id and service id
972 } else {
973 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
974
975 // update descriptor
976 ld->setAutoUsed();
977 ld->setAlive();
978 ld->communicationUp = true;
979 ld->fromRemote = false;
980
981 // if link is a relayed link->convert to direct link
982 if (ld->relayed) {
983 logging_info( "Converting to direct link: " << ld );
984 ld->up = true;
985 ld->relayed = false;
986 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
987 overMsg.setSourceLink( ld->overlayId );
988 overMsg.setDestinationLink( ld->remoteLink );
989 send_link( &overMsg, ld->overlayId );
990 } else {
991 // note: necessary to validate the link on the remote side!
992 logging_info( "Sending out update" <<
993 " for service " << ld->service.toString() <<
994 " with local node id " << nodeId.toString() <<
995 " on link " << ld->overlayId.toString() );
996
997 // compile and send update message
998 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
999 overlayMsg.setSourceLink(ld->overlayId);
1000 overlayMsg.setAutoLink( ld->autolink );
1001 send_link( &overlayMsg, ld->overlayId, true );
1002 }
1003 }
1004}
1005
1006void BaseOverlay::onLinkDown(const LinkID& id,
1007 const address_v* local, const address_v* remote) {
1008
1009 // erase bootstrap links
1010 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1011 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1012
1013 // get descriptor for link
1014 LinkDescriptor* ld = getDescriptor(id, true);
1015 if ( ld == NULL ) return; // not found? ->ignore!
1016 logging_info( "onLinkDown descriptor: " << ld );
1017
1018 // removing relay link information
1019 removeRelayLink(ld->overlayId);
1020
1021 // inform listeners about link down
1022 ld->communicationUp = false;
1023 if (!ld->service.isUnspecified()) {
1024 getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode );
1025 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1026 }
1027
1028 // delete all queued messages (auto links)
1029 if( ld->messageQueue.size() > 0 ) {
1030 logging_warn( "Dropping link " << id.toString() << " that has "
1031 << ld->messageQueue.size() << " waiting messages" );
1032 ld->flushQueue();
1033 }
1034
1035 // erase mapping
1036 eraseDescriptor(ld->overlayId);
1037}
1038
1039void BaseOverlay::onLinkChanged(const LinkID& id,
1040 const address_v* oldlocal, const address_v* newlocal,
1041 const address_v* oldremote, const address_v* newremote) {
1042
1043 // get descriptor for link
1044 LinkDescriptor* ld = getDescriptor(id, true);
1045 if ( ld == NULL ) return; // not found? ->ignore!
1046 logging_debug( "onLinkChanged descriptor: " << ld );
1047
1048 // inform listeners
1049 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1050 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1051
1052 // autolinks: refresh timestamp
1053 ld->setAutoUsed();
1054}
1055
1056void BaseOverlay::onLinkFail(const LinkID& id,
1057 const address_v* local, const address_v* remote) {
1058 logging_debug( "Link fail with base communication link id=" << id );
1059
1060 // erase bootstrap links
1061 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1062 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1063
1064 // get descriptor for link
1065 LinkDescriptor* ld = getDescriptor(id, true);
1066 if ( ld == NULL ) return; // not found? ->ignore!
1067 logging_debug( "Link failed id=" << ld->overlayId.toString() );
1068
1069 // inform listeners
1070 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1071 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1072}
1073
1074void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1075 const address_v* remote, const QoSParameterSet& qos) {
1076 logging_debug( "Link quality changed with base communication link id=" << id );
1077
1078 // get descriptor for link
1079 LinkDescriptor* ld = getDescriptor(id, true);
1080 if ( ld == NULL ) return; // not found? ->ignore!
1081 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1082}
1083
1084bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1085 const address_v* remote ) {
1086 logging_debug("Accepting link request from " << remote->to_string() );
1087 return true;
1088}
1089
1090/// handles a message from base communication
1091bool BaseOverlay::receiveMessage(const Message* message,
1092 const LinkID& link, const NodeID& ) {
1093 // get descriptor for link
1094 LinkDescriptor* ld = getDescriptor( link, true );
1095 return handleMessage( message, ld, link );
1096}
1097
1098// ----------------------------------------------------------------------------
1099
1100/// Handle spovnet instance join requests
1101bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1102
1103 // decapsulate message
1104 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1105 logging_info( "Received join request for spovnet " <<
1106 joinReq->getSpoVNetID().toString() );
1107
1108 // check spovnet id
1109 if( joinReq->getSpoVNetID() != spovnetId ) {
1110 logging_error(
1111 "Received join request for spovnet we don't handle " <<
1112 joinReq->getSpoVNetID().toString() );
1113 return false;
1114 }
1115
1116 // TODO: here you can implement mechanisms to deny joining of a node
1117 bool allow = true;
1118 logging_info( "Sending join reply for spovnet " <<
1119 spovnetId.toString() << " to node " <<
1120 overlayMsg->getSourceNode().toString() <<
1121 ". Result: " << (allow ? "allowed" : "denied") );
1122 joiningNodes.push_back( overlayMsg->getSourceNode() );
1123
1124 // return overlay parameters
1125 assert( overlayInterface != NULL );
1126 logging_debug( "Using bootstrap end-point "
1127 << getEndpointDescriptor().toString() )
1128 OverlayParameterSet parameters = overlayInterface->getParameters();
1129 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1130 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1131 JoinReply replyMsg( spovnetId, parameters,
1132 allow, getEndpointDescriptor() );
1133 retmsg.encapsulate(&replyMsg);
1134 bc->sendMessage( bcLink, &retmsg );
1135
1136 return true;
1137}
1138
1139/// Handle replies to spovnet instance join requests
1140bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1141 // decapsulate message
1142 logging_debug("received join reply message");
1143 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1144
1145 // correct spovnet?
1146 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1147 logging_error( "Received SpoVNet join reply for " <<
1148 replyMsg->getSpoVNetID().toString() <<
1149 " != " << spovnetId.toString() );
1150 delete replyMsg;
1151 return false;
1152 }
1153
1154 // access granted? no -> fail
1155 if( !replyMsg->getJoinAllowed() ) {
1156 logging_error( "Our join request has been denied" );
1157
1158 // drop initiator link
1159 if( !bcLink.isUnspecified() ){
1160 bc->dropLink( bcLink );
1161
1162 vector<LinkID>::iterator it = std::find(
1163 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1164 if( it != bootstrapLinks.end() )
1165 bootstrapLinks.erase(it);
1166 }
1167
1168 // inform all registered services of the event
1169 BOOST_FOREACH( NodeListener* i, nodeListeners )
1170 i->onJoinFailed( spovnetId );
1171
1172 delete replyMsg;
1173 return true;
1174 }
1175
1176 // access has been granted -> continue!
1177 logging_info("Join request has been accepted for spovnet " <<
1178 spovnetId.toString() );
1179
1180 logging_debug( "Using bootstrap end-point "
1181 << replyMsg->getBootstrapEndpoint().toString() );
1182
1183 // create overlay structure from spovnet parameter set
1184 // if we have not boostrapped yet against some other node
1185 if( overlayInterface == NULL ){
1186
1187 logging_debug("first-time bootstrapping");
1188
1189 overlayInterface = OverlayFactory::create(
1190 *this, replyMsg->getParam(), nodeId, this );
1191
1192 // overlay structure supported? no-> fail!
1193 if( overlayInterface == NULL ) {
1194 logging_error( "overlay structure not supported" );
1195
1196 if( !bcLink.isUnspecified() ){
1197 bc->dropLink( bcLink );
1198
1199 vector<LinkID>::iterator it = std::find(
1200 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1201 if( it != bootstrapLinks.end() )
1202 bootstrapLinks.erase(it);
1203 }
1204
1205 // inform all registered services of the event
1206 BOOST_FOREACH( NodeListener* i, nodeListeners )
1207 i->onJoinFailed( spovnetId );
1208
1209 delete replyMsg;
1210 return true;
1211 }
1212
1213 // everything ok-> join the overlay!
1214 state = BaseOverlayStateCompleted;
1215 overlayInterface->createOverlay();
1216
1217 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1218 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1219
1220 // update ovlvis
1221 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1222
1223 // inform all registered services of the event
1224 BOOST_FOREACH( NodeListener* i, nodeListeners )
1225 i->onJoinCompleted( spovnetId );
1226
1227 delete replyMsg;
1228
1229<<<<<<< .working
1230 // inform all registered services of the event
1231 BOOST_FOREACH( NodeListener* i, nodeListeners )
1232 i->onJoinFailed( spovnetId );
1233=======
1234 } else {
1235>>>>>>> .merge-rechts.r5869
1236
1237 // this is not the first bootstrap, just join the additional node
1238 logging_debug("not first-time bootstrapping");
1239 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1240 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1241
1242 delete replyMsg;
1243 } // if( overlayInterface == NULL )
1244
1245 return true;
1246}
1247
1248
1249<<<<<<< .working
1250=======
1251bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1252 // get service
1253 const ServiceID& service = overlayMsg->getService();
1254 logging_debug( "Received data for service " << service.toString()
1255 << " on link " << overlayMsg->getDestinationLink().toString() );
1256
1257>>>>>>> .merge-rechts.r5869
1258 // delegate data message
1259 getListener(service)->onMessage(
1260 overlayMsg,
1261 overlayMsg->getSourceNode(),
1262 overlayMsg->getDestinationLink()
1263 );
1264
1265 return true;
1266}
1267
1268<<<<<<< .working
1269=======
1270
1271>>>>>>> .merge-rechts.r5869
1272bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1273
1274<<<<<<< .working
1275 //record bootstrap ep as good endpoint to join
1276 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1277
1278 delete replyMsg;
1279 ret = true;
1280 break;
1281 }
1282=======
1283 if( ld == NULL ) {
1284 logging_warn( "received overlay update message for link for "
1285 << "which we have no mapping" );
1286 return false;
1287 }
1288 logging_info("Received type update message on link " << ld );
1289>>>>>>> .merge-rechts.r5869
1290
1291 // update our link mapping information for this link
1292 bool changed =
1293 ( ld->remoteNode != overlayMsg->getSourceNode() )
1294 || ( ld->service != overlayMsg->getService() );
1295
1296 // set parameters
1297 ld->up = true;
1298 ld->remoteNode = overlayMsg->getSourceNode();
1299 ld->remoteLink = overlayMsg->getSourceLink();
1300 ld->service = overlayMsg->getService();
1301 ld->autolink = overlayMsg->isAutoLink();
1302
1303 // if our link information changed, we send out an update, too
1304 if( changed ) {
1305 overlayMsg->swapRoles();
1306 overlayMsg->setSourceNode(nodeId);
1307 overlayMsg->setSourceLink(ld->overlayId);
1308 overlayMsg->setService(ld->service);
1309 send( overlayMsg, ld );
1310 }
1311
1312<<<<<<< .working
1313 // delegate data message
1314 listener->onMessage( overlayMsg,
1315 overlayMsg->getSourceNode(), ld->overlayId );
1316=======
1317 // service registered? no-> error!
1318 if( !communicationListeners.contains( ld->service ) ) {
1319 logging_warn( "Link up: event listener has not been registered" );
1320 return false;
1321 }
1322>>>>>>> .merge-rechts.r5869
1323
1324 // default or no service registered?
1325 CommunicationListener* listener = communicationListeners.get( ld->service );
1326 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1327 logging_warn("Link up: event listener is default or null!" );
1328 return true;
1329 }
1330
1331 // update descriptor
1332 ld->listener = listener;
1333 ld->setAutoUsed();
1334 ld->setAlive();
1335
1336 // ask the service whether it wants to accept this link
1337 if( !listener->onLinkRequest(ld->remoteNode) ) {
1338
1339 logging_debug("Link id=" << ld->overlayId.toString() <<
1340 " has been denied by service " << ld->service.toString() << ", dropping link");
1341
1342 // prevent onLinkDown calls to the service
1343 ld->listener = &CommunicationListener::DEFAULT;
1344
1345 // drop the link
1346 dropLink( ld->overlayId );
1347 return true;
1348 }
1349
1350 // set link up
1351 ld->up = true;
1352 logging_info( "Link has been accepted by service and is up: " << ld );
1353
1354 // auto links: link has been accepted -> send queued messages
1355 if( ld->messageQueue.size() > 0 ) {
1356 logging_info( "Sending out queued messages on link " << ld );
1357 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1358 sendMessage( msg, ld->overlayId );
1359 delete msg;
1360 }
1361 ld->messageQueue.clear();
1362 }
1363
1364 // call the notification functions
1365 listener->onLinkUp( ld->overlayId, ld->remoteNode );
1366 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1367
1368 return true;
1369}
1370
1371/// handle a link request and reply
1372bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1373 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1374
1375 //TODO: Check if a request has already been sent using getSourceLink() ...
1376
1377 // create link descriptor
1378 LinkDescriptor* ldn = addDescriptor();
1379
1380 // flags
1381 ldn->up = true;
1382 ldn->fromRemote = true;
1383 ldn->relayed = true;
1384
1385 // parameters
1386 ldn->service = overlayMsg->getService();
1387 ldn->listener = getListener(ldn->service);
1388 ldn->remoteNode = overlayMsg->getSourceNode();
1389 ldn->remoteLink = overlayMsg->getSourceLink();
1390
1391 // update time-stamps
1392 ldn->setAlive();
1393 ldn->setAutoUsed();
1394
1395 // create reply message and send back!
1396 overlayMsg->swapRoles(); // swap source/destination
1397 overlayMsg->setType(OverlayMsg::typeLinkReply);
1398 overlayMsg->setSourceLink(ldn->overlayId);
1399 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1400 overlayMsg->setRelayed(true);
1401 send( overlayMsg, ld ); // send back to link
1402
1403 // inform listener
1404 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1405
1406 return true;
1407}
1408
1409bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1410
1411 // find link request
1412 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1413
1414 // not found? yes-> drop with error!
1415 if (ldn == NULL) {
1416 logging_error( "No link request pending for "
1417 << overlayMsg->getDestinationLink().toString() );
1418 return false;
1419 }
1420 logging_debug("Handling link reply for " << ldn )
1421
1422 // check if already up
1423 if (ldn->up) {
1424 logging_warn( "Link already up: " << ldn );
1425 return true;
1426 }
1427
1428 // debug message
1429 logging_debug( "Link request reply received. Establishing link"
1430 << " for service " << overlayMsg->getService().toString()
1431 << " with local id=" << overlayMsg->getDestinationLink()
1432 << " and remote link id=" << overlayMsg->getSourceLink()
1433 << " to " << overlayMsg->getSourceEndpoint().toString()
1434 );
1435
1436 // set local link descriptor data
1437 ldn->up = true;
1438 ldn->relayed = true;
1439 ldn->service = overlayMsg->getService();
1440 ldn->listener = getListener(ldn->service);
1441 ldn->remoteLink = overlayMsg->getSourceLink();
1442 ldn->remoteNode = overlayMsg->getSourceNode();
1443
1444 // update timestamps
1445 ldn->setAlive();
1446 ldn->setAutoUsed();
1447
1448 // auto links: link has been accepted -> send queued messages
1449 if( ldn->messageQueue.size() > 0 ) {
1450 logging_info( "Sending out queued messages on link " <<
1451 ldn->overlayId.toString() );
1452 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1453 sendMessage( msg, ldn->overlayId );
1454 delete msg;
1455 }
1456 ldn->messageQueue.clear();
1457 }
1458
1459 // inform listeners about new link
1460 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1461
1462 // try to replace relay link with direct link
1463 ldn->communicationId =
1464 bc->establishLink( overlayMsg->getSourceEndpoint() );
1465
1466 return true;
1467}
1468
1469<<<<<<< .working
1470=======
1471/// handle a keep-alive message for a link
1472bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1473 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1474 if ( rld != NULL ) {
1475 logging_debug("Keep-Alive for " <<
1476 overlayMsg->getDestinationLink() );
1477 if (overlayMsg->isRouteRecord())
1478 rld->routeRecord = overlayMsg->getRouteRecord();
1479 rld->setAlive();
1480 return true;
1481 } else {
1482 logging_error("Keep-Alive for "
1483 << overlayMsg->getDestinationLink() << ": link unknown." );
1484 return false;
1485 }
1486}
1487
1488/// handle a direct link message
1489bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1490 logging_debug( "Received direct link replacement request" );
1491
1492>>>>>>> .merge-rechts.r5869
1493 /// get destination overlay link
1494 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1495 if (rld == NULL || ld == NULL) {
1496 logging_error("Direct link replacement: Link "
1497 << overlayMsg->getDestinationLink() << "not found error." );
1498 return false;
1499 }
1500 logging_info( "Received direct link convert notification for " << rld );
1501
1502 // update information
1503 rld->communicationId = ld->communicationId;
1504 rld->communicationUp = true;
1505 rld->relayed = false;
1506
1507<<<<<<< .working
1508 // mark incoming link as relay
1509 if (ld!=NULL) ld->markAsRelay();
1510
1511 // am I the destination of this message? yes->
1512 if (relayMsg->getDestNode() == nodeId ) {
1513 // deliver relay message locally!
1514 logging_debug("Relay message reached destination. Handling the message.");
1515 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1516 ret = true;
1517 break;
1518 }
1519
1520 // create route message
1521 OverlayMsg _overMsg( *overlayMsg );
1522 RelayMessage _relayMsg( *relayMsg );
1523 _relayMsg.setType( RelayMessage::typeRoute );
1524 _overMsg.encapsulate( &_relayMsg );
1525=======
1526 // mark used and alive!
1527 rld->setAlive();
1528 rld->setAutoUsed();
1529>>>>>>> .merge-rechts.r5869
1530
1531 // erase the original descriptor
1532 eraseDescriptor(ld->overlayId);
1533}
1534
1535/// handles an incoming message
1536bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1537 const LinkID bcLink ) {
1538 logging_debug( "Handling message: " << message->toString());
1539
1540<<<<<<< .working
1541 // mark incoming link as relay
1542 if (ld!=NULL) ld->markAsRelay();
1543
1544 // am I the destination of this message? yes->
1545 if (relayMsg->getDestNode() == nodeId ) {
1546 // deliver relay message locally!
1547 logging_debug("Relay message reached destination. Handling the message.");
1548 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1549 ret = true;
1550 break;
1551 }
1552
1553 // am I the relay for this message? yes->
1554 if (relayMsg->getRelayNode() == nodeId ) {
1555 logging_debug("I'm the relay for this message. Sending to destination.");
1556 OverlayMsg _overMsg( *overlayMsg );
1557 RelayMessage _relayMsg( *relayMsg );
1558 _overMsg.encapsulate(&_relayMsg);
1559=======
1560 // decapsulate overlay message
1561 OverlayMsg* overlayMsg =
1562 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1563 if( overlayMsg == NULL ) return false;
1564>>>>>>> .merge-rechts.r5869
1565
1566 // refresh relay information
1567 refreshRelayInformation( overlayMsg, ld );
1568
1569 // increase number of hops
1570 overlayMsg->increaseNumHops();
1571
1572 // update route record
1573 overlayMsg->addRouteRecord(nodeId);
1574
1575 // handle signaling messages (do not route!)
1576 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1577 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1578 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1579 delete overlayMsg;
1580 return true;
1581 }
1582
1583 // message for reached destination? no-> route message
1584 if (!overlayMsg->getDestinationNode().isUnspecified() &&
1585 overlayMsg->getDestinationNode() != nodeId ) {
1586 logging_debug("Routing message "
1587 << " from " << overlayMsg->getSourceNode()
1588 << " to " << overlayMsg->getDestinationNode()
1589 );
1590
1591 route( overlayMsg, ld );
1592 delete overlayMsg;
1593 return true;
1594 }
1595
1596 // handle base overlay message
1597 bool ret = false; // return value
1598 switch ( overlayMsg->getType() ) {
1599
1600 // data transport messages
1601 case OverlayMsg::typeData:
1602 ret = handleData(overlayMsg, ld); break;
1603
1604 // overlay setup messages
1605 case OverlayMsg::typeJoinRequest:
1606 ret = handleJoinRequest(overlayMsg, bcLink ); break;
1607 case OverlayMsg::typeJoinReply:
1608 ret = handleJoinReply(overlayMsg, bcLink ); break;
1609
1610 // link specific messages
1611 case OverlayMsg::typeLinkRequest:
1612 ret = handleLinkRequest(overlayMsg, ld ); break;
1613 case OverlayMsg::typeLinkReply:
1614 ret = handleLinkReply(overlayMsg, ld ); break;
1615 case OverlayMsg::typeLinkUpdate:
1616 ret = handleLinkUpdate(overlayMsg, ld ); break;
1617 case OverlayMsg::typeLinkAlive:
1618 ret = handleLinkAlive(overlayMsg, ld ); break;
1619 case OverlayMsg::typeLinkDirect:
1620 ret = handleLinkDirect(overlayMsg, ld ); break;
1621
1622 // handle unknown message type
1623 default: {
1624 logging_error( "received message in invalid state! don't know " <<
1625 "what to do with this message of type " << overlayMsg->getType() );
1626 ret = false;
1627 break;
1628 }
1629 }
1630
1631 // free overlay message and return value
1632 delete overlayMsg;
1633 return ret;
1634}
1635
1636// ----------------------------------------------------------------------------
1637
1638void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1639
1640 logging_debug( "broadcasting message to all known nodes " <<
1641 "in the overlay from service " + service.toString() );
1642
1643 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1644 OverlayInterface::NodeList::iterator i = nodes.begin();
1645 for(; i != nodes.end(); i++ ) {
1646 if( *i == nodeId) continue; // don't send to ourselfs
1647 sendMessage( message, *i, service );
1648 }
1649}
1650
1651/// return the overlay neighbors
1652vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1653
1654 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1655 // the known nodes _can_ also include our node, so we remove ourself
1656 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1657 if( i != nodes.end() ) nodes.erase( i );
1658 return nodes;
1659}
1660
1661const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1662 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1663 const LinkDescriptor* ld = getDescriptor(lid);
1664 if( ld == NULL ) return NodeID::UNSPECIFIED;
1665 else return ld->remoteNode;
1666}
1667
1668vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1669 vector<LinkID> linkvector;
1670 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1671 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1672 linkvector.push_back( ld->overlayId );
1673 }
1674 }
1675 return linkvector;
1676}
1677
1678
1679void BaseOverlay::onNodeJoin(const NodeID& node) {
1680 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1681 if( i == joiningNodes.end() ) return;
1682
1683 logging_info( "node has successfully joined baseoverlay and overlay structure "
1684 << node.toString() );
1685
1686 joiningNodes.erase( i );
1687}
1688
1689void BaseOverlay::eventFunction() {
1690<<<<<<< .working
1691
1692 // send keep-alive messages over established links
1693 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1694 if (!ld->up) continue;
1695 OverlayMsg overMsg( OverlayMsg::typeKeepAlive,
1696 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1697 sendMessage( &overMsg, ld );
1698 }
1699
1700 // iterate over all links and check for time boundaries
1701 vector<LinkDescriptor*> oldlinks;
1702 time_t now = time(NULL);
1703 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1704 // remote used as relay flag
1705 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)
1706 ld->usedAsRelay = false;
1707
1708 // keep alives and not up? yes-> link connection request is stale!
1709 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
1710
1711 // increase counter
1712 ld->keepAliveMissed++;
1713
1714 // missed more than four keep-alive messages (10 sec)? -> drop link
1715 if (ld->keepAliveMissed > 4) {
1716 logging_info( "Link connection request is stale, closing: " << ld );
1717 oldlinks.push_back( ld );
1718 continue;
1719 }
1720 }
1721
1722 if (!ld->up) continue;
1723
1724 // drop links that are dropped and not used as relay
1725 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink) {
1726 oldlinks.push_back( ld );
1727 continue;
1728 }
1729
1730 // auto-link time exceeded?
1731 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
1732 oldlinks.push_back( ld );
1733 continue;
1734 }
1735
1736 // keep alives missed? yes->
1737 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
1738
1739 // increase counter
1740 ld->keepAliveMissed++;
1741
1742 // missed more than four keep-alive messages (4 sec)? -> drop link
1743 if (ld->keepAliveMissed >= 8) {
1744 logging_info( "Link is stale, closing: " << ld );
1745 oldlinks.push_back( ld );
1746 continue;
1747 }
1748 }
1749 }
1750
1751 // drop links
1752 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
1753/*
1754 vector<LinkID>::iterator it = std::find(
1755 bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId);
1756
1757 if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){
1758 logging_info( "Not dropping initiator link: " << ld );
1759 continue;
1760 }
1761*/
1762 logging_info( "Link timed out. Dropping " << ld );
1763 dropLink( ld->overlayId );
1764 }
1765
1766 // show link state
1767 counter++;
1768 if (counter>=4) showLinkState();
1769 if (counter>=4 || counter<0) counter = 0;
1770=======
1771 stabilizeRelays();
1772 stabilizeLinks();
1773>>>>>>> .merge-rechts.r5869
1774}
1775
1776}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.