An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/overlay/BaseOverlay.cpp @ 5638

Last change on this file since 5638 was 5638, checked in by Christoph Mayer, 14 years ago

adress detection aufgeräumt, network info für bleutooth, data stream (hopeful crash fix), logging auf maemo nur warn, ...

File size: 50.2 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51#include "ariba/overlay/messages/OverlayMsg.h"
52#include "ariba/overlay/messages/JoinRequest.h"
53#include "ariba/overlay/messages/JoinReply.h"
54#include "ariba/overlay/messages/LinkRequest.h"
55#include "ariba/overlay/messages/RelayMessage.h"
56
57#include "ariba/utility/misc/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
63        BOOST_FOREACH( LinkDescriptor* lp, links )
64                if ((communication ? lp->communicationId : lp->overlayId) == link)
65                        return lp;
66        return NULL;
67}
68
69const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
70        BOOST_FOREACH( const LinkDescriptor* lp, links )
71                if ((communication ? lp->communicationId : lp->overlayId) == link)
72                        return lp;
73        return NULL;
74}
75
76LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
77        BOOST_FOREACH( LinkDescriptor* lp, links )
78                if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up)
79                        return lp;
80        BOOST_FOREACH( LinkDescriptor* lp, links )
81                if (lp->autolink && lp->remoteNode == node && lp->service == service )
82                        return lp;
83        return NULL;
84}
85
86void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
87        for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
88                LinkDescriptor* ld = *i;
89                if ((communication ? ld->communicationId : ld->overlayId) == link) {
90                        delete ld;
91                        links.erase(i);
92                        break;
93                }
94        }
95}
96
97LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
98        LinkDescriptor* desc = getDescriptor( link );
99        if ( desc == NULL ) {
100                desc = new LinkDescriptor();
101                desc->overlayId = link;
102                links.push_back(desc);
103        }
104        return desc;
105}
106
107/// returns a direct link relay descriptor to the given relay node
108LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& relayNode ) {
109        BOOST_FOREACH( LinkDescriptor* lp, links )
110                if (lp->remoteNode == relayNode &&
111                    lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&
112                    lp->relay == false &&
113                    lp->up)
114                        return lp;
115        return NULL;
116}
117
118/// find a proper relay node
119const NodeID BaseOverlay::findRelayNode( const NodeID id ) {
120        LinkDescriptor* rld = NULL;
121        NodeID relayNode = NodeID::UNSPECIFIED;
122
123        // get used next hop towards node
124        LinkID rlid = overlayInterface->getNextLinkId(id);
125        while ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {
126
127                // get descriptor of first hop
128                rld = getDescriptor(rlid);
129                logging_info( rld );
130
131                // is first hop a relay path? yes-> try to find real link!
132                if ( rld->relay )
133                        relayNode = getRelayDescriptor(rld->localRelay)->remoteNode;
134
135                // no-> a proper relay node has been found
136                else relayNode = rld->remoteNode;
137        }
138
139        // if first relay is unknown choose a arbitrary direct node as relay
140        if ( relayNode.isUnspecified() ) {
141                for (size_t i=0; i<links.size(); i++)
142                        if (links[i]->up &&
143                                links[i]->communicationUp &&
144                                !links[i]->relay &&
145                                links[i]->keepAliveMissed <= 1 &&
146                                links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
147                                relayNode = links[i]->remoteNode;
148                                break;
149                        }
150        }
151
152        // do not return myself or use the node as relay node
153        if (relayNode == nodeId)
154                return NodeID::UNSPECIFIED;
155        else {
156                logging_info( "Returning relay node " << relayNode.toString() );
157                return relayNode;
158        }
159}
160
161/// forwards a message over relays/directly using link descriptor
162seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) {
163
164        // directly send message
165        if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) {
166                logging_debug("Send: Sending message via Base Communication");
167                return bc->sendMessage( ld->communicationId, message );
168        }
169
170        // relay message
171        else if ( ld->relay ) {
172
173                logging_debug("Send: Relaying message to node "
174                        << ld->remoteNode.toString()
175                        << " using relay " << ld->localRelay
176                );
177/*
178                // get local relay link descriptor and mark as used for relaying
179                LinkDescriptor* rld = getRelayDescriptor(ld->localRelay);
180                if (rld==NULL) {
181                        logging_error("Send: Relay descriptor for relay " <<
182                                ld->localRelay.toString() << " is unknown.");
183                        return -1;
184                }
185                rld->markAsRelay();*/
186
187                // create a information relay message to inform the relay about
188                OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId );
189                RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId );
190                relayMsg.encapsulate( message );
191                overlay_msg.encapsulate( &relayMsg );
192
193                // route message to relay node in order to inform it!
194                logging_debug("sendMessage: Sending message over relayed link with" << ld );
195                sendOverlay( &overlay_msg, ld->localRelay );
196                return 0;
197        }
198
199        // error
200        else {
201                logging_error( "Could not send message descriptor=" << ld );
202                return -1;
203        }
204        return -1;
205}
206
207LinkDescriptor* BaseOverlay::getSendDescriptor( const NodeID& nodeid, bool follow ) {
208        for (size_t i=0; i<links.size(); i++)
209                if ( !links[i]->relay &&
210                        links[i]->up &&
211                        links[i]->communicationUp &&
212                        links[i]->keepAliveMissed <= 1 &&
213                        links[i]->remoteNode == nodeid &&
214                        links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
215                        return links[i];
216                }
217        LinkDescriptor* ld = getDescriptor(overlayInterface->getNextLinkId(nodeid));
218        if (ld != NULL && ld->relay && follow)
219                return getSendDescriptor(ld->localRelay, false);
220        return NULL;
221}
222
223/// routes a message over the overlay or directly sends it when a link is open
224seqnum_t BaseOverlay::sendOverlay( Message* message, const NodeID& nodeid ) {
225        for (size_t i=0; i<links.size(); i++)
226                if ( !links[i]->relay &&
227                        links[i]->up &&
228                        links[i]->communicationUp &&
229                        links[i]->keepAliveMissed <= 1 &&
230                        links[i]->remoteNode == nodeid &&
231                        links[i]->service == OverlayInterface::OVERLAY_SERVICE_ID) {
232                        links[i]->markAsRelay();
233                        return sendMessage( message, links[i] );
234                        break;
235                }
236        overlayInterface->routeMessage(nodeid, message);
237        return 0;
238}
239
240/// creates a link descriptor, apply relay semantics if possible
241LinkDescriptor* BaseOverlay::createLinkDescriptor(
242        const NodeID remoteNode, const ServiceID service, const LinkID link_id ) {
243
244        // find listener
245        if( !communicationListeners.contains( service ) ) {
246                logging_error( "No listener found for service " << service.toString() );
247                return NULL;
248        }
249        CommunicationListener* listener = communicationListeners.get( service );
250        assert( listener != NULL );
251
252        // copy link id
253        LinkID linkid = link_id;
254
255        // create link id if necessary
256        if ( linkid.isUnspecified() )
257                linkid = LinkID::create();
258
259        // create relay link descriptor
260        NodeID relayNode = findRelayNode(remoteNode);
261
262        // add descriptor
263        LinkDescriptor* ld = addDescriptor( linkid );
264        ld->overlayId  = linkid;
265        ld->service    = service;
266        ld->listener   = listener;
267        ld->remoteNode = remoteNode;
268
269        // set relay node if available
270        ld->relay      = !relayNode.isUnspecified();
271        ld->localRelay = relayNode;
272
273        if (!ld->relay)
274                logging_error("No relay found!");
275
276        // debug output
277        logging_debug( "Created link descriptor: " << ld );
278
279        return ld;
280}
281
282
283// ----------------------------------------------------------------------------
284
285use_logging_cpp(BaseOverlay);
286
287// ----------------------------------------------------------------------------
288
289BaseOverlay::BaseOverlay() :
290        bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
291        spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
292        sideport(&SideportListener::DEFAULT), started(false), counter(0) {
293}
294
295BaseOverlay::~BaseOverlay() {
296}
297
298// ----------------------------------------------------------------------------
299
300void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
301        logging_info("Starting...");
302
303        // set parameters
304        bc = &_basecomm;
305        nodeId = _nodeid;
306
307        // register at base communication
308        bc->registerMessageReceiver( this );
309        bc->registerEventListener( this );
310
311        // timer for auto link management
312        Timer::setInterval( 500 );
313        Timer::start();
314
315        started = true;
316        state = BaseOverlayStateInvalid;
317}
318
319void BaseOverlay::stop() {
320        logging_info("Stopping...");
321
322        // stop timer
323        Timer::stop();
324
325        // delete oberlay interface
326        if(overlayInterface != NULL) {
327                delete overlayInterface;
328                overlayInterface = NULL;
329        }
330
331        // unregister at base communication
332        bc->unregisterMessageReceiver( this );
333        bc->unregisterEventListener( this );
334
335        started = false;
336        state = BaseOverlayStateInvalid;
337}
338
339bool BaseOverlay::isStarted(){
340        return started;
341}
342
343// ----------------------------------------------------------------------------
344
345void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
346                const EndpointDescriptor& bootstrapEp) {
347
348        if(id != spovnetId){
349                logging_error("attempt to join against invalid spovnet, call initiate first");
350                return;
351        }
352
353
354        //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
355        logging_info( "Starting to join spovnet " << id.toString() <<
356                        " with nodeid " << nodeId.toString());
357
358        if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
359
360                // bootstrap against ourselfs
361                logging_debug("joining spovnet locally");
362
363                overlayInterface->joinOverlay();
364                state = BaseOverlayStateCompleted;
365                BOOST_FOREACH( NodeListener* i, nodeListeners )
366                        i->onJoinCompleted( spovnetId );
367
368                //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
369                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
370
371                logging_debug("starting overlay bootstrap module");
372                overlayBootstrap.start(this, spovnetId, nodeId);
373                overlayBootstrap.publish(bc->getEndpointDescriptor());
374
375        } else {
376
377                // bootstrap against another node
378                logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
379
380                const LinkID& lnk = bc->establishLink( bootstrapEp );
381                bootstrapLinks.push_back(lnk);
382                logging_info("join process initiated for " << id.toString() << "...");
383        }
384}
385
386void BaseOverlay::leaveSpoVNet() {
387
388        logging_info( "Leaving spovnet " << spovnetId );
389        bool ret = ( state != this->BaseOverlayStateInvalid );
390
391        logging_debug("stopping overlay bootstrap module");
392        overlayBootstrap.stop();
393        overlayBootstrap.revoke();
394
395        logging_debug( "Dropping all auto-links" );
396
397        // gather all service links
398        vector<LinkID> servicelinks;
399        BOOST_FOREACH( LinkDescriptor* ld, links ) {
400                if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
401                servicelinks.push_back( ld->overlayId );
402        }
403
404        // drop all service links
405        BOOST_FOREACH( LinkID lnk, servicelinks )
406                dropLink( lnk );
407
408        // let the node leave the spovnet overlay interface
409        logging_debug( "Leaving overlay" );
410        if( overlayInterface != NULL )
411                overlayInterface->leaveOverlay();
412
413        // drop still open bootstrap links
414        BOOST_FOREACH( LinkID lnk, bootstrapLinks )
415                bc->dropLink( lnk );
416
417        // change to inalid state
418        state = BaseOverlayStateInvalid;
419        //ovl.visShutdown( ovlId, nodeId, string("") );
420
421        // inform all registered services of the event
422        BOOST_FOREACH( NodeListener* i, nodeListeners ) {
423                if( ret ) i->onLeaveCompleted( spovnetId );
424                else i->onLeaveFailed( spovnetId );
425        }
426}
427
428void BaseOverlay::createSpoVNet(const SpoVNetID& id,
429                const OverlayParameterSet& param,
430                const SecurityParameterSet& sec,
431                const QoSParameterSet& qos) {
432
433        // set the state that we are an initiator, this way incoming messages are
434        // handled correctly
435        logging_info( "creating spovnet " + id.toString() <<
436                        " with nodeid " << nodeId.toString() );
437
438        spovnetId = id;
439
440        overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
441        if( overlayInterface == NULL ) {
442                logging_fatal( "overlay structure not supported" );
443                state = BaseOverlayStateInvalid;
444
445                BOOST_FOREACH( NodeListener* i, nodeListeners )
446                        i->onJoinFailed( spovnetId );
447
448                return;
449        }
450}
451
452// ----------------------------------------------------------------------------
453
454const LinkID BaseOverlay::establishLink(
455        const EndpointDescriptor& ep, const NodeID& nodeid,
456        const ServiceID& service, const LinkID& linkid ) {
457
458        LinkID link_id = linkid;
459
460        // establish link via overlay
461        if (!nodeid.isUnspecified())
462                link_id = establishLink( nodeid, service, link_id );
463
464        // establish link directly if only ep is known
465        if (nodeid.isUnspecified())
466                establishLink( ep, service, link_id );
467
468        return link_id;
469}
470
471/// call base communication's establish link and add link mapping
472const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep,
473                const ServiceID& service, const LinkID& linkid ) {
474
475        // create a new link id if necessary
476        LinkID link_id = linkid;
477        if (link_id.isUnspecified()) link_id = LinkID::create();
478
479        /// find a service listener
480        if( !communicationListeners.contains( service ) ) {
481                logging_error( "No listener registered for service id=" << service.toString() );
482                return LinkID::UNSPECIFIED;
483        }
484        CommunicationListener* listener = communicationListeners.get( service );
485        assert( listener != NULL );
486
487        /// establish link and add mapping
488        logging_info("Establishing direct link " << link_id.toString()
489                << " using " << ep.toString());
490
491        // create descriptor
492        LinkDescriptor* ld = addDescriptor( link_id );
493        ld->overlayId = link_id;
494        ld->communicationId = link_id;
495        ld->listener = listener;
496        ld->service = service;
497        bc->establishLink( ep, link_id );
498
499        return link_id;
500}
501
502/// establishes a link between two arbitrary nodes
503const LinkID BaseOverlay::establishLink( const NodeID& node,
504                const ServiceID& service, const LinkID& link_id ) {
505
506        // do not establish a link to myself!
507        if (node == nodeId) return LinkID::UNSPECIFIED;
508
509        // create a link descriptor
510        LinkDescriptor* ld = createLinkDescriptor( node, service, link_id );
511
512        // create link request message with own link id
513        uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
514        LinkRequest link_request_msg(
515                nonce, &bc->getEndpointDescriptor(), false,
516                ld->overlayId, ld->localRelay );
517        OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
518        overlay_msg.encapsulate( &link_request_msg );
519        pendingLinks.insert( make_pair(nonce, ld->overlayId) );
520
521        // debug message
522        logging_debug(
523                "Sending link request with"
524                << " link id="        << ld->overlayId
525                << " node id="        << ld->remoteNode.toString()
526                << " service id="     << ld->service.toString()
527                << " local relay id=" << ld->localRelay.toString()
528                << " nonce= "         << nonce
529        );
530
531        // sending message through new link
532        sendMessage( &overlay_msg, ld );
533
534        return ld->overlayId;
535}
536
537/// drops an established link
538void BaseOverlay::dropLink(const LinkID& link) {
539        logging_debug( "Dropping link (initiated locally):" << link.toString() );
540
541        // find the link item to drop
542        LinkDescriptor* ld = getDescriptor(link);
543        if( ld == NULL ) {
544                logging_warn( "Can't drop link, link is unknown!");
545                return;
546        }
547
548        // delete all queued messages
549        if( ld->messageQueue.size() > 0 ) {
550                logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
551                                << ld->messageQueue.size() << " waiting messages" );
552                ld->flushQueue();
553        }
554
555        // inform sideport and listener
556        ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
557        sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
558
559        // do not drop relay links
560        if (!ld->usedAsRelay) {
561                // drop the link in base communication
562                if (ld->communicationUp) bc->dropLink( ld->communicationId );
563
564                // erase descriptor
565                eraseDescriptor( ld->overlayId );
566        } else
567                ld->dropWhenRelaysLeft = true;
568}
569
570// ----------------------------------------------------------------------------
571
572/// internal send message, always use this functions to send messages over links
573seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
574        logging_debug( "Sending data message on link " << link.toString() );
575
576        // get the mapping for this link
577        LinkDescriptor* ld = getDescriptor(link);
578        if( ld == NULL ) {
579                logging_error("Could not send message. "
580                        << "Link not found id=" << link.toString());
581                return -1;
582        }
583
584        // check if the link is up yet, if its an auto link queue message
585        if( !ld->up ) {
586                ld->markAsUsed();
587                if( ld->autolink ) {
588                        logging_info("Auto-link " << link.toString() << " not up, queue message");
589                        Data data = data_serialize( message );
590                        const_cast<Message*>(message)->dropPayload();
591                        ld->messageQueue.push_back( new Message(data) );
592                } else {
593                        logging_error("Link " << link.toString() << " not up, drop message");
594                }
595                return -1;
596        }
597
598        // compile overlay message (has service and node id)
599        OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId );
600        overmsg.encapsulate( const_cast<Message*>(message) );
601
602        // send message over relay/direct/overlay
603        return sendMessage( &overmsg, ld );
604}
605
606seqnum_t BaseOverlay::sendMessage(const Message* message,
607        const NodeID& node, const ServiceID& service) {
608
609        // find link for node and service
610        LinkDescriptor* ld = getAutoDescriptor( node, service );
611
612        // if we found no link, create an auto link
613        if( ld == NULL ) {
614
615                // debug output
616                logging_info( "No link to send message to node "
617                        << node.toString() << " found for service "
618                        << service.toString() << ". Creating auto link ..."
619                );
620
621                // this will call onlinkup on us, if everything worked we now have a mapping
622                LinkID link = LinkID::create();
623
624                // call base overlay to create a link
625                link = establishLink( node, service, link );
626                ld = getDescriptor( link );
627                if( ld == NULL ) {
628                        logging_error( "Failed to establish auto-link.");
629                        return -1;
630                }
631                ld->autolink = true;
632
633                logging_debug( "Auto-link establishment in progress to node "
634                                << node.toString() << " with link id=" << link.toString() );
635        }
636        assert(ld != NULL);
637
638        // mark the link as used, as we now send a message through it
639        ld->markAsUsed();
640
641        // send / queue message
642        return sendMessage( message, ld->overlayId );
643}
644
645// ----------------------------------------------------------------------------
646
647const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
648                const LinkID& link) const {
649
650        // return own end-point descriptor
651        if( link == LinkID::UNSPECIFIED )
652                return bc->getEndpointDescriptor();
653
654        // find link descriptor. not found -> return unspecified
655        const LinkDescriptor* ld = getDescriptor(link);
656        if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
657
658        // return endpoint-descriptor from base communication
659        return bc->getEndpointDescriptor( ld->communicationId );
660}
661
662const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
663                const NodeID& node) const {
664
665        // return own end-point descriptor
666        if( node == nodeId || node == NodeID::UNSPECIFIED )
667                return bc->getEndpointDescriptor();
668
669        // no joined and request remote descriptor? -> fail!
670        if( overlayInterface == NULL ) {
671                logging_error( "overlay interface not set, cannot resolve endpoint" );
672                return EndpointDescriptor::UNSPECIFIED();
673        }
674
675        // resolve end-point descriptor from the base-overlay routing table
676        return overlayInterface->resolveNode( node );
677}
678
679// ----------------------------------------------------------------------------
680
681bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
682        sideport = _sideport;
683        _sideport->configure( this );
684}
685
686bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
687        sideport = &SideportListener::DEFAULT;
688}
689
690// ----------------------------------------------------------------------------
691
692bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
693        logging_debug( "binding communication listener " << listener
694                        << " on serviceid " << sid.toString() );
695
696        if( communicationListeners.contains( sid ) ) {
697                logging_error( "some listener already registered for service id "
698                                << sid.toString() );
699                return false;
700        }
701
702        communicationListeners.registerItem( listener, sid );
703        return true;
704}
705
706
707bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
708        logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
709
710        if( !communicationListeners.contains( sid ) ) {
711                logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
712                return false;
713        }
714
715        if( communicationListeners.get(sid) != listener ) {
716                logging_warn( "listener bound to service id " << sid.toString()
717                                << " is different than listener trying to unbind" );
718                return false;
719        }
720
721        communicationListeners.unregisterItem( sid );
722        return true;
723}
724
725// ----------------------------------------------------------------------------
726
727bool BaseOverlay::bind(NodeListener* listener) {
728        logging_debug( "Binding node listener " << listener );
729
730        // already bound? yes-> warning
731        NodeListenerVector::iterator i =
732                find( nodeListeners.begin(), nodeListeners.end(), listener );
733        if( i != nodeListeners.end() ) {
734                logging_warn("Node listener " << listener << " is already bound!" );
735                return false;
736        }
737
738        // no-> add
739        nodeListeners.push_back( listener );
740        return true;
741}
742
743bool BaseOverlay::unbind(NodeListener* listener) {
744        logging_debug( "Unbinding node listener " << listener );
745
746        // already unbound? yes-> warning
747        NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
748        if( i == nodeListeners.end() ) {
749                logging_warn( "Node listener " << listener << " is not bound!" );
750                return false;
751        }
752
753        // no-> remove
754        nodeListeners.erase( i );
755        return true;
756}
757
758// ----------------------------------------------------------------------------
759
760void BaseOverlay::onLinkUp(const LinkID& id,
761        const address_v* local, const address_v* remote) {
762        logging_debug( "Link up with base communication link id=" << id );
763
764        // get descriptor for link
765        LinkDescriptor* ld = getDescriptor(id, true);
766
767        // handle bootstrap link we initiated
768        if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
769                logging_info(
770                        "Join has been initiated by me and the link is now up. " <<
771                        "Sending out join request for SpoVNet " << spovnetId.toString()
772                );
773
774                // send join request message
775                OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId );
776                JoinRequest joinRequest( spovnetId, nodeId );
777                overlayMsg.encapsulate( &joinRequest );
778                bc->sendMessage( id, &overlayMsg );
779                return;
780        }
781
782        // no link found? -> link establishment from remote, add one!
783        if (ld == NULL) {
784                ld = addDescriptor( id );
785                logging_debug( "onLinkUp (remote request) descriptor: " << ld );
786
787                // update descriptor
788                ld->fromRemote = true;
789                ld->communicationId = id;
790                ld->communicationUp = true;
791                ld->markAsUsed();
792
793                // in this case, do not inform listener, since service it unknown
794                // -> wait for update message!
795
796        // link mapping found? -> send update message with node-id and service id
797        } else {
798                logging_debug( "onLinkUp descriptor (initiated locally):" << ld );
799
800                // note: necessary to validate the link on the remote side!
801                logging_debug( "Sending out update" <<
802                        " for service " << ld->service.toString() <<
803                        " with local node id " << nodeId.toString() <<
804                        " on link " << ld->overlayId.toString() );
805
806                // update descriptor
807                ld->markAsUsed();
808                ld->communicationUp = true;
809
810                // if link is a relayed link ->convert to direct link
811                if (ld->relay) {
812                        logging_info( "Converting to direct link: " << ld );
813                        ld->up = true;
814                        ld->relay = false;
815                        ld->localRelay = NodeID::UNSPECIFIED;
816                        OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId );
817                        overMsg.setRelayLink( ld->remoteLinkId );
818                        bc->sendMessage( ld->communicationId, &overMsg );
819                }
820
821                // compile and send update message
822                OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
823                overlayMsg.setAutoLink( ld->autolink );
824                bc->sendMessage( ld->communicationId, &overlayMsg );
825        }
826}
827
828void BaseOverlay::onLinkDown(const LinkID& id,
829        const address_v* local, const address_v* remote) {
830
831        // erase bootstrap links
832        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
833        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
834
835        // get descriptor for link
836        LinkDescriptor* ld = getDescriptor(id, true);
837        if ( ld == NULL ) return; // not found? ->ignore!
838        logging_info( "onLinkDown descriptor: " << ld );
839
840        // inform listeners about link down
841        ld->communicationUp = false;
842        ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
843        sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
844
845        // delete all queued messages (auto links)
846        if( ld->messageQueue.size() > 0 ) {
847                logging_warn( "Dropping link " << id.toString() << " that has "
848                                << ld->messageQueue.size() << " waiting messages" );
849                ld->flushQueue();
850        }
851
852        // erase mapping
853        eraseDescriptor(ld->overlayId);
854}
855
856void BaseOverlay::onLinkChanged(const LinkID& id,
857        const address_v* oldlocal, const address_v* newlocal,
858        const address_v* oldremote, const address_v* newremote) {
859
860        // get descriptor for link
861        LinkDescriptor* ld = getDescriptor(id, true);
862        if ( ld == NULL ) return; // not found? ->ignore!
863        logging_debug( "onLinkChanged descriptor: " << ld );
864
865        // inform listeners
866        ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
867        sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
868
869        // autolinks: refresh timestamp
870        ld->markAsUsed();
871}
872
873void BaseOverlay::onLinkFail(const LinkID& id,
874        const address_v* local, const address_v* remote) {
875        logging_debug( "Link fail with base communication link id=" << id );
876
877        // erase bootstrap links
878        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
879        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
880
881        // get descriptor for link
882        LinkDescriptor* ld = getDescriptor(id, true);
883        if ( ld == NULL ) return; // not found? ->ignore!
884        logging_debug( "Link failed id=" << ld->overlayId.toString() );
885
886        // inform listeners
887        ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
888        sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
889
890        // autolinks: refresh timestamp
891        ld->markAsUsed();
892}
893
894void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
895        const address_v* remote, const QoSParameterSet& qos) {
896        logging_debug( "Link quality changed with base communication link id=" << id );
897
898        // get descriptor for link
899        LinkDescriptor* ld = getDescriptor(id, true);
900        if ( ld == NULL ) return; // not found? ->ignore!
901        logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
902
903        // autolinks: refresh timestamp
904        ld->markAsUsed();
905}
906
907bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
908        const address_v* remote ) {
909        logging_debug("Accepting link request from " << remote->to_string() );
910        return true;
911}
912
913/// handles a message from base communication
914bool BaseOverlay::receiveMessage(const Message* message,
915                const LinkID& link, const NodeID& ) {
916        // get descriptor for link
917        LinkDescriptor* ld = getDescriptor( link, true );
918
919        // link known?
920        if (ld == NULL) { // no-> handle with unspecified params
921                logging_debug("Received message from base communication, link descriptor unknown" );
922                return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED );
923        } else { // yes -> handle with overlay link id
924                logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() );
925                return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED );
926        }
927}
928
929// ----------------------------------------------------------------------------
930
931/// handles a message from an overlay
932void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) {
933        logging_debug("Received message from overlay -- "
934                << " link id=" << link.toString()
935                << " node id=" << source.toString() );
936        handleMessage( msg, link, LinkID::UNSPECIFIED, source );
937}
938
939// ----------------------------------------------------------------------------
940
941/// handles an incoming message
942bool BaseOverlay::handleMessage( const Message* message,
943        const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) {
944        logging_debug( "Handling message: " << message->toString());
945
946        // decapsulate overlay message
947        OverlayMsg* overlayMsg =
948                const_cast<Message*>(message)->decapsulate<OverlayMsg>();
949        if( overlayMsg == NULL ) return false;
950
951        // mark the link as in action
952        LinkDescriptor* ld = getDescriptor(boLink);
953        if (ld == NULL) ld = getDescriptor(bcLink, true);
954        if (ld != NULL) {
955                ld->markAsUsed();
956                ld->markAlive();
957        }
958
959        switch ( overlayMsg->getType() ) {
960                // ---------------------------------------------------------------------
961                // Handle spovnet instance join requests
962                // ---------------------------------------------------------------------
963                case OverlayMsg::typeJoinRequest: {
964
965                        // decapsulate message
966                        JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
967                        logging_info( "Received join request for spovnet " <<
968                                        joinReq->getSpoVNetID().toString() );
969
970                        // check spovnet id
971                        if( joinReq->getSpoVNetID() != spovnetId ) {
972                                logging_error(
973                                        "Received join request for spovnet we don't handle " <<
974                                        joinReq->getSpoVNetID().toString() );
975                                return false;
976                        }
977
978                        // TODO: here you can implement mechanisms to deny joining of a node
979                        bool allow = true;
980                        logging_info( "Sending join reply for spovnet " <<
981                                        spovnetId.toString() << " to node " <<
982                                        overlayMsg->getSourceNode().toString() <<
983                                        ". Result: " << (allow ? "allowed" : "denied") );
984                        joiningNodes.push_back( overlayMsg->getSourceNode() );
985
986                        // return overlay parameters
987                        assert( overlayInterface != NULL );
988                        logging_debug( "Using bootstrap end-point "
989                                << getEndpointDescriptor().toString() )
990                        OverlayParameterSet parameters = overlayInterface->getParameters();
991                        OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId );
992                        JoinReply replyMsg( spovnetId, parameters,
993                                        allow, getEndpointDescriptor() );
994                        retmsg.encapsulate(&replyMsg);
995                        bc->sendMessage( bcLink, &retmsg );
996                        return true;
997                }
998
999                // ---------------------------------------------------------------------
1000                // handle replies to spovnet instance join requests
1001                // ---------------------------------------------------------------------
1002                case OverlayMsg::typeJoinReply: {
1003
1004                        // decapsulate message
1005                        logging_debug("received join reply message");
1006                        JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1007
1008                        // correct spovnet?
1009                        if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1010                                logging_error( "Received SpoVNet join reply for " <<
1011                                                replyMsg->getSpoVNetID().toString() <<
1012                                                " != " << spovnetId.toString() );
1013                                return false;
1014                        }
1015
1016                        // access granted? no -> fail
1017                        if( !replyMsg->getJoinAllowed() ) {
1018                                logging_error( "Our join request has been denied" );
1019
1020                                // drop initiator link
1021
1022                                if(bcLink != LinkID::UNSPECIFIED){
1023                                        bc->dropLink( bcLink );
1024
1025                                        vector<LinkID>::iterator it = std::find(
1026                                                        bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1027                                        if( it != bootstrapLinks.end() )
1028                                                bootstrapLinks.erase(it);
1029                                }
1030
1031                                // inform all registered services of the event
1032                                BOOST_FOREACH( NodeListener* i, nodeListeners )
1033                                        i->onJoinFailed( spovnetId );
1034
1035                                return true;
1036                        }
1037
1038                        // access has been granted -> continue!
1039                        logging_info("Join request has been accepted for spovnet " <<
1040                                        spovnetId.toString() );
1041
1042                        logging_debug( "Using bootstrap end-point "
1043                                << replyMsg->getBootstrapEndpoint().toString() );
1044
1045                        //
1046                        // create overlay structure from spovnet parameter set
1047                        // if we have not boostrapped yet against some other node
1048                        //
1049
1050                        if( overlayInterface == NULL ){
1051
1052                                logging_debug("first-time bootstrapping");
1053
1054                                overlayInterface = OverlayFactory::create(
1055                                        *this, replyMsg->getParam(), nodeId, this );
1056
1057                                // overlay structure supported? no-> fail!
1058                                if( overlayInterface == NULL ) {
1059                                        logging_error( "overlay structure not supported" );
1060
1061                                        if(bcLink != LinkID::UNSPECIFIED){
1062                                                bc->dropLink( bcLink );
1063
1064                                                vector<LinkID>::iterator it = std::find(
1065                                                                bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1066                                                if( it != bootstrapLinks.end() )
1067                                                        bootstrapLinks.erase(it);
1068                                        }
1069
1070                                        // inform all registered services of the event
1071                                        BOOST_FOREACH( NodeListener* i, nodeListeners )
1072                                        i->onJoinFailed( spovnetId );
1073
1074                                        return true;
1075                                }
1076
1077                                // everything ok-> join the overlay!
1078                                state = BaseOverlayStateCompleted;
1079                                overlayInterface->createOverlay();
1080
1081                                overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1082
1083                                // update ovlvis
1084                                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1085
1086                                // inform all registered services of the event
1087                                BOOST_FOREACH( NodeListener* i, nodeListeners )
1088                                        i->onJoinCompleted( spovnetId );
1089
1090                        } else {
1091
1092                                // this is not the first bootstrap, just join the additional node
1093                                logging_debug("not first-time bootstrapping");
1094                                overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1095
1096                        } // if( overlayInterface == NULL )
1097
1098                        return true;
1099                }
1100
1101                // ---------------------------------------------------------------------
1102                // handle data forward messages
1103                // ---------------------------------------------------------------------
1104                case OverlayMsg::typeData: {
1105
1106                        // get service
1107                        const ServiceID& service = overlayMsg->getService();
1108                        logging_debug( "received data for service " << service.toString() );
1109
1110                        // find listener
1111                        CommunicationListener* listener =
1112                                communicationListeners.get( service );
1113                        if( listener == NULL ) return true;
1114
1115                        // delegate data message
1116                        listener->onMessage( overlayMsg,
1117                                overlayMsg->getSourceNode(), ld->overlayId );
1118
1119                        return true;
1120                }
1121
1122                // ---------------------------------------------------------------------
1123                // handle update messages for link establishment
1124                // ---------------------------------------------------------------------
1125                case OverlayMsg::typeUpdate: {
1126                        logging_debug("Received type update message on link " << ld );
1127
1128                        // get info
1129                        const NodeID& sourcenode = overlayMsg->getSourceNode();
1130                        const ServiceID& service = overlayMsg->getService();
1131
1132                        // no link descriptor available -> error!
1133                        if( ld == NULL ) {
1134                                logging_warn( "received overlay update message for link " <<
1135                                                ld->overlayId.toString() << " for which we have no mapping" );
1136                                return false;
1137                        }
1138
1139                        // update our link mapping information for this link
1140                        bool changed =
1141                                ( ld->remoteNode != sourcenode ) || ( ld->service != service );
1142                        ld->remoteNode = sourcenode;
1143                        ld->service = service;
1144                        ld->autolink = overlayMsg->isAutoLink();
1145
1146                        // if our link information changed, we send out an update, too
1147                        if( changed ) {
1148                                OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId );
1149                                overMsg.setAutoLink(ld->autolink);
1150                                bc->sendMessage( ld->communicationId, &overMsg );
1151                        }
1152
1153                        // service registered? no-> error!
1154                        if( !communicationListeners.contains( service ) ) {
1155                                logging_warn( "Link up: event listener has not been registered" );
1156                                return false;
1157                        }
1158
1159                        // default or no service registered?
1160                        CommunicationListener* listener = communicationListeners.get( service );
1161                        if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1162                                logging_warn("Link up: event listener is default or null!" );
1163                                return true;
1164                        }
1165
1166                        // update descriptor
1167                        ld->listener = listener;
1168                        ld->markAsUsed();
1169                        ld->markAlive();
1170
1171                        // ask the service whether it wants to accept this link
1172                        if( !listener->onLinkRequest(sourcenode) ) {
1173
1174                                logging_debug("Link id=" << ld->overlayId.toString() <<
1175                                        " has been denied by service " << service.toString() << ", dropping link");
1176
1177                                // prevent onLinkDown calls to the service
1178                                ld->listener = &CommunicationListener::DEFAULT;
1179
1180                                // drop the link
1181                                dropLink( ld->overlayId );
1182                                return true;
1183                        }
1184
1185                        // set link up
1186                        ld->up = true;
1187                        logging_debug(
1188                                   "Link " << ld->overlayId.toString()
1189                                << " has been accepted by service " << service.toString()
1190                                << " and is now up"
1191                        );
1192
1193                        // auto links: link has been accepted -> send queued messages
1194                        if( ld->messageQueue.size() > 0 ) {
1195                                logging_info( "sending out queued messages on link " <<
1196                                        ld->overlayId.toString() );
1197                                BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1198                                        sendMessage( msg, ld->overlayId );
1199                                        delete msg;
1200                                }
1201                                ld->messageQueue.clear();
1202                        }
1203
1204                        // call the notification functions
1205                        listener->onLinkUp( ld->overlayId, sourcenode );
1206                        sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId );
1207
1208                        return true;
1209                }
1210
1211                // ---------------------------------------------------------------------
1212                // handle link request forwarded through the overlay
1213                // ---------------------------------------------------------------------
1214                case OverlayMsg::typeLinkRequest: {
1215
1216                        logging_debug( "received link request on link" );
1217
1218                        // decapsulate message
1219                        LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
1220                        const ServiceID& service = overlayMsg->getService();
1221
1222                        // is request reply?
1223                        if ( linkReq->isReply() ) {
1224
1225                                // find link
1226                                PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
1227                                if ( i == pendingLinks.end() ) {
1228                                        logging_error( "Nonce not found in link request" );
1229                                        return true;
1230                                }
1231
1232                                // debug message
1233                                logging_debug( "Link request reply received. Establishing link "
1234                                        << i->second << " to " << (linkReq->getEndpoint()->toString())
1235                                        << " for service " << service.toString()
1236                                        << " with nonce "  << linkReq->getNonce()
1237                                        << " using relay " << linkReq->getRelay().toString()
1238                                        << " and remote link id=" << linkReq->getRemoteLinkId()
1239                                );
1240
1241                                // get descriptor
1242                                LinkDescriptor* ldn = getDescriptor(i->second);
1243
1244                                // check if link request reply has a relay node ...
1245                                if (!linkReq->getRelay().isUnspecified()) { // yes->
1246                                        ldn->up = true;
1247                                        ldn->relay = true;
1248                                        if (ldn->localRelay.isUnspecified())  {
1249                                                logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn );
1250                                                showLinkState();
1251                                        }
1252                                        ldn->remoteRelay  = linkReq->getRelay();
1253                                        ldn->remoteLinkId = linkReq->getRemoteLinkId();
1254                                        ldn->remoteNode   = overlayMsg->getSourceNode();
1255
1256                                        ldn->markAlive();
1257
1258                                        // compile and send update message
1259                                        OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId );
1260                                        _overlayMsg.setAutoLink(ldn->autolink);
1261                                        sendMessage( &_overlayMsg, ldn );
1262
1263                                        // auto links: link has been accepted -> send queued messages
1264                                        if( ldn->messageQueue.size() > 0 ) {
1265                                                logging_info( "Sending out queued messages on link " <<
1266                                                        ldn->overlayId.toString() );
1267                                                BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1268                                                        sendMessage( msg, ldn->overlayId );
1269                                                        delete msg;
1270                                                }
1271                                                ldn->messageQueue.clear();
1272                                        }
1273
1274                                        ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1275
1276                                        // try to establish a direct link
1277                                        ldn->communicationId =
1278                                                bc->establishLink( *linkReq->getEndpoint(), i->second );
1279                                }
1280
1281                                // no relay node-> use overlay routing
1282                                else {
1283                                        ldn->up = true;
1284
1285                                        // establish direct link
1286                                        ldn->communicationId =
1287                                                        bc->establishLink( *linkReq->getEndpoint(), i->second );
1288                                }
1289                        } else {
1290                                logging_debug( "Link request received from node id="
1291                                                << overlayMsg->getSourceNode() );
1292
1293                                // create link descriptor
1294                                LinkDescriptor* ldn =
1295                                        createLinkDescriptor(overlayMsg->getSourceNode(),
1296                                                        overlayMsg->getService(), LinkID::UNSPECIFIED );
1297                                assert(!ldn->overlayId.isUnspecified());
1298
1299                                // create reply message
1300                                OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId );
1301                                LinkRequest link_request_msg(
1302                                        linkReq->getNonce(),
1303                                        &bc->getEndpointDescriptor(),
1304                                        true, ldn->overlayId, ldn->localRelay
1305                                );
1306                                overlay_msg.encapsulate( &link_request_msg );
1307
1308                                // debug message
1309                                logging_debug( "Sending LinkRequest reply for link with nonce " <<
1310                                                linkReq->getNonce() );
1311
1312                                // if this is a relay link-> update information & inform listeners
1313                                if (!linkReq->getRelay().isUnspecified()) {
1314                                        // set flags
1315                                        ldn->up = true;
1316                                        ldn->relay = true;
1317                                        if (ldn->localRelay.isUnspecified()) {
1318                                                logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn );
1319                                                showLinkState();
1320                                        }
1321                                        ldn->remoteRelay  = linkReq->getRelay();
1322                                        ldn->remoteNode   = overlayMsg->getSourceNode();
1323                                        ldn->remoteLinkId = linkReq->getRemoteLinkId();
1324                                        ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1325                                }
1326
1327                                // route message back over overlay
1328                                sendMessage( &overlay_msg, ldn );
1329                        }
1330                        return true;
1331                }
1332
1333                // ---------------------------------------------------------------------
1334                // handle relay message to forward messages
1335                // ---------------------------------------------------------------------
1336                case OverlayMsg::typeRelay: {
1337
1338                        logging_debug( "received relay request on link" );
1339
1340                        // decapsulate message
1341                        RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>();
1342
1343                        // is relay message informative?
1344                        switch (relayMsg->getType()) {
1345
1346                                // handle relay notification
1347                                case RelayMessage::typeInform: {
1348                                        logging_info("Received relay information message with"
1349                                                        << " relay " << relayMsg->getRelayNode()
1350                                                        << " destination " << relayMsg->getDestNode() );
1351
1352                                        // mark incoming link as relay
1353                                        if (ld!=NULL) ld->markAsRelay();
1354
1355                                        // am I the destination of this message? yes->
1356                                        if (relayMsg->getDestNode() == nodeId ) {
1357                                                // deliver relay message locally!
1358                                                logging_debug("Relay message reached destination. Handling the message.");
1359                                                handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1360                                                return true;
1361                                        }
1362
1363                                        // create route message
1364                                        OverlayMsg _overMsg( *overlayMsg );
1365                                        RelayMessage _relayMsg( *relayMsg );
1366                                        _relayMsg.setType( RelayMessage::typeRoute );
1367                                        _overMsg.encapsulate( &_relayMsg );
1368
1369                                        // forward message
1370                                        if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) {
1371                                                logging_info("Routing relay message to " << relayMsg->getDestNode().toString() );
1372                                                sendOverlay( &_overMsg, relayMsg->getDestNode() );
1373                                        } else {
1374                                                logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() );
1375                                                sendOverlay( &_overMsg, relayMsg->getRelayNode() );
1376                                        }
1377                                        return true;
1378                                }
1379
1380                                // handle relay routing
1381                                case RelayMessage::typeRoute: {
1382                                        logging_info("Received relay route message with"
1383                                                        << " relay " << relayMsg->getRelayNode()
1384                                                        << " destination " << relayMsg->getDestNode() );
1385
1386                                        // mark incoming link as relay
1387                                        if (ld!=NULL) ld->markAsRelay();
1388
1389                                        // am I the destination of this message? yes->
1390                                        if (relayMsg->getDestNode() == nodeId ) {
1391                                                // deliver relay message locally!
1392                                                logging_debug("Relay message reached destination. Handling the message.");
1393                                                handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );
1394                                                return true;
1395                                        }
1396
1397                                        // am I the relay for this message? yes->
1398                                        if (relayMsg->getRelayNode() == nodeId ) {
1399                                                logging_debug("I'm the relay for this message. Sending to destination.");
1400                                                OverlayMsg _overMsg( *overlayMsg );
1401                                                RelayMessage _relayMsg( *relayMsg );
1402                                                _overMsg.encapsulate(&_relayMsg);
1403
1404                                                /// this must be handled by using relay link!
1405                                                sendOverlay(&_overMsg, relayMsg->getDestNode());
1406                                                return true;
1407                                        }
1408
1409                                        // error: I'm not a relay or destination!
1410                                        logging_error("This node is neither relay nor destination. Dropping Message!");
1411                                        return true;
1412                                }
1413                                default: {
1414                                        logging_error("RelayMessage Unknown!");
1415                                        return true;
1416                                }
1417                        }
1418
1419                        break;
1420                }
1421
1422                // ---------------------------------------------------------------------
1423                // handle keep-alive messages
1424                // ---------------------------------------------------------------------
1425                case OverlayMsg::typeKeepAlive: {
1426                        logging_debug( "received keep-alive on link" );
1427                        if ( ld != NULL ) {
1428                                logging_info("Keep-Alive for "<< ld->overlayId);
1429                                ld->markAlive();
1430                        }
1431                        break;
1432                }
1433
1434                // ---------------------------------------------------------------------
1435                // handle direct link replacement messages
1436                // ---------------------------------------------------------------------
1437                case OverlayMsg::typeDirectLink: {
1438
1439                        logging_debug( "Received direct link replacement request" );
1440
1441                        LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() );
1442                        if (rld == NULL) {
1443                                logging_error("Direct link replacement: Link "
1444                                                << overlayMsg->getRelayLink() << "not found error." );
1445                                break;
1446                        }
1447                        logging_info( "Received direct link convert notification for " << rld );
1448
1449                        // set communcation link id and set it up
1450                        rld->communicationId = ld->communicationId;
1451
1452                        // this is neccessary since this link was a relay link before!
1453                        rld->communicationUp = true;
1454
1455                        // this is not a relay link anymore!
1456                        rld->relay = false;
1457                        rld->localRelay  = NodeID::UNSPECIFIED;
1458                        rld->remoteRelay = NodeID::UNSPECIFIED;
1459
1460                        // mark used and alive!
1461                        rld->markAsUsed();
1462                        rld->markAlive();
1463
1464                        // erase the original descriptor
1465                        eraseDescriptor(ld->overlayId);
1466                        break;
1467                }
1468
1469                // ---------------------------------------------------------------------
1470                // handle unknown message type
1471                // ---------------------------------------------------------------------
1472                default: {
1473                        logging_error( "received message in invalid state! don't know " <<
1474                                        "what to do with this message of type " <<
1475                                        overlayMsg->getType() );
1476                        return false;
1477                }
1478        } /* switch */
1479
1480        return false;
1481}
1482
1483// ----------------------------------------------------------------------------
1484
1485void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1486
1487        logging_debug( "broadcasting message to all known nodes " <<
1488                        "in the overlay from service " + service.toString() );
1489
1490        OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1491        OverlayInterface::NodeList::iterator i = nodes.begin();
1492        for(; i != nodes.end(); i++ ) {
1493                if( *i == nodeId) continue; // don't send to ourselfs
1494                sendMessage( message, *i, service );
1495        }
1496}
1497
1498vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1499        // the known nodes _can_ also include our node, so we remove ourself
1500        vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1501
1502        vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1503        if( i != nodes.end() ) nodes.erase( i );
1504
1505        return nodes;
1506}
1507
1508const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1509        if( lid == LinkID::UNSPECIFIED ) return nodeId;
1510        const LinkDescriptor* ld = getDescriptor(lid);
1511        if( ld == NULL ) return NodeID::UNSPECIFIED;
1512        else return ld->remoteNode;
1513}
1514
1515vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1516        vector<LinkID> linkvector;
1517        BOOST_FOREACH( LinkDescriptor* ld, links ) {
1518                if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1519                        linkvector.push_back( ld->overlayId );
1520                }
1521        }
1522        return linkvector;
1523}
1524
1525
1526void BaseOverlay::onNodeJoin(const NodeID& node) {
1527        JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1528        if( i == joiningNodes.end() ) return;
1529
1530        logging_info( "node has successfully joined baseoverlay and overlay structure "
1531                        << node.toString() );
1532
1533        joiningNodes.erase( i );
1534}
1535
1536void BaseOverlay::eventFunction() {
1537
1538        // send keep-alive messages over established links
1539        BOOST_FOREACH( LinkDescriptor* ld, links ) {
1540                if (!ld->up) continue;
1541                OverlayMsg overMsg( OverlayMsg::typeKeepAlive,
1542                        OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1543                sendMessage( &overMsg, ld );
1544        }
1545
1546        // iterate over all links and check for time boundaries
1547        vector<LinkDescriptor*> oldlinks;
1548        time_t now = time(NULL);
1549        BOOST_FOREACH( LinkDescriptor* ld, links ) {
1550                // remote used as relay flag
1551                if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)
1552                        ld->usedAsRelay = false;
1553
1554                // keep alives and not up? yes-> link connection request is stale!
1555                if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
1556
1557                        // increase counter
1558                        ld->keepAliveMissed++;
1559
1560                        // missed more than four keep-alive messages (10 sec)? -> drop link
1561                        if (ld->keepAliveMissed > 4) {
1562                                logging_info( "Link connection request is stale, closing: " << ld );
1563                                oldlinks.push_back( ld );
1564                                continue;
1565                        }
1566                }
1567
1568                if (!ld->up) continue;
1569
1570                // drop links that are dropped and not used as relay
1571                if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink) {
1572                        oldlinks.push_back( ld );
1573                        continue;
1574                }
1575
1576                // auto-link time exceeded?
1577                if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
1578                        oldlinks.push_back( ld );
1579                        continue;
1580                }
1581
1582                // keep alives missed? yes->
1583                if ( difftime( now, ld->keepAliveTime ) > 2 ) {
1584
1585                        // increase counter
1586                        ld->keepAliveMissed++;
1587
1588                        // missed more than four keep-alive messages (4 sec)? -> drop link
1589                        if (ld->keepAliveMissed >= 8) {
1590                                logging_info( "Link is stale, closing: " << ld );
1591                                oldlinks.push_back( ld );
1592                                continue;
1593                        }
1594                }
1595        }
1596
1597        // drop links
1598        BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {
1599/*
1600                vector<LinkID>::iterator it = std::find(
1601                                bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId);
1602
1603                if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){
1604                        logging_info( "Not dropping initiator link: " << ld );
1605                        continue;
1606                }
1607*/
1608                logging_info( "Link timed out. Dropping " << ld );
1609                dropLink( ld->overlayId );
1610        }
1611
1612        // show link state
1613        counter++;
1614        if (counter>=4) showLinkState();
1615        if (counter>=4 || counter<0) counter = 0;
1616}
1617
1618void BaseOverlay::showLinkState() {
1619        int i=0;
1620        logging_info("--- link state -------------------------------");
1621        BOOST_FOREACH( LinkDescriptor* ld, links ) {
1622                logging_info("link " << i << ": " << ld);
1623                i++;
1624        }
1625        logging_info("----------------------------------------------");
1626}
1627
1628}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.