An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/overlay/BaseOverlay.cpp @ 10653

Last change on this file since 10653 was 10653, checked in by Michael Tänzer, 9 years ago

Merge the ASIO branch back into trunk

File size: 57.3 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/visual/OvlVis.h"
57#include "ariba/utility/visual/DddVis.h"
58#include "ariba/utility/visual/ServerVis.h"
59
60namespace ariba {
61namespace overlay {
62
63#define visualInstance          ariba::utility::DddVis::instance()
64#define visualIdOverlay         ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
65#define visualIdBase            ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
66
67
68// ----------------------------------------------------------------------------
69
70/* *****************************************************************************
71 * PREREQUESITES
72 * ****************************************************************************/
73
74CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
75        if( !communicationListeners.contains( service ) ) {
76                logging_info( "No listener found for service " << service.toString() );
77                return NULL;
78        }
79        CommunicationListener* listener = communicationListeners.get( service );
80        assert( listener != NULL );
81        return listener;
82}
83
84// link descriptor handling ----------------------------------------------------
85
86LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
87        BOOST_FOREACH( LinkDescriptor* lp, links )
88                                if ((communication ? lp->communicationId : lp->overlayId) == link)
89                                        return lp;
90        return NULL;
91}
92
93const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
94        BOOST_FOREACH( const LinkDescriptor* lp, links )
95                                if ((communication ? lp->communicationId : lp->overlayId) == link)
96                                        return lp;
97        return NULL;
98}
99
100/// erases a link descriptor
101void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
102        for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
103                LinkDescriptor* ld = *i;
104                if ((communication ? ld->communicationId : ld->overlayId) == link) {
105                        delete ld;
106                        links.erase(i);
107                        break;
108                }
109        }
110}
111
112/// adds a link descriptor
113LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
114        LinkDescriptor* desc = getDescriptor( link );
115        if ( desc == NULL ) {
116                desc = new LinkDescriptor();
117                if (!link.isUnspecified()) desc->overlayId = link;
118                links.push_back(desc);
119        }
120        return desc;
121}
122
123/// returns a auto-link descriptor
124LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
125        // search for a descriptor that is already up
126        BOOST_FOREACH( LinkDescriptor* lp, links )
127                                if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
128                                        return lp;
129        // if not found, search for one that is about to come up...
130        BOOST_FOREACH( LinkDescriptor* lp, links )
131        if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
132                return lp;
133        return NULL;
134}
135
136/// stabilizes link information
137void BaseOverlay::stabilizeLinks() {
138        // send keep-alive messages over established links
139        BOOST_FOREACH( LinkDescriptor* ld, links ) {
140                if (!ld->up) continue;
141                OverlayMsg msg( OverlayMsg::typeLinkAlive,
142                                OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
143                if (ld->relayed) msg.setRouteRecord(true);
144                send_link( &msg, ld->overlayId );
145        }
146
147        // iterate over all links and check for time boundaries
148        vector<LinkDescriptor*> oldlinks;
149        time_t now = time(NULL);
150        BOOST_FOREACH( LinkDescriptor* ld, links ) {
151
152                // keep alives and not up? yes-> link connection request is stale!
153                if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
154
155                        // increase counter
156                        ld->keepAliveMissed++;
157
158                        // missed more than four keep-alive messages (10 sec)? -> drop link
159                        if (ld->keepAliveMissed > 4) {
160                                logging_info( "Link connection request is stale, closing: " << ld );
161                                oldlinks.push_back( ld );
162                                continue;
163                        }
164                }
165
166                if (!ld->up) continue;
167
168                // check if link is relayed and retry connecting directly
169                if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) {
170                        ld->retryCounter--;
171                        ld->communicationId = bc->establishLink( ld->endpoint );
172                }
173
174                // remote used as relay flag
175                if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
176                        ld->relaying = false;
177
178                // drop links that are dropped and not used as relay
179                if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
180                        oldlinks.push_back( ld );
181                        continue;
182                }
183
184                // auto-link time exceeded?
185                if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
186                        oldlinks.push_back( ld );
187                        continue;
188                }
189
190                // keep alives missed? yes->
191                if ( difftime( now, ld->keepAliveTime ) > 4 ) {
192
193                        // increase counter
194                        ld->keepAliveMissed++;
195
196                        // missed more than four keep-alive messages (4 sec)? -> drop link
197                        if (ld->keepAliveMissed >= 2) {
198                                logging_info( "Link is stale, closing: " << ld );
199                                oldlinks.push_back( ld );
200                                continue;
201                        }
202                }
203        }
204
205        // drop links
206        BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
207                logging_info( "Link timed out. Dropping " << ld );
208                ld->relaying = false;
209                dropLink( ld->overlayId );
210        }
211
212        // show link state
213        counter++;
214        if (counter>=4) showLinks();
215        if (counter>=4 || counter<0) counter = 0;
216}
217
218
219std::string BaseOverlay::getLinkHTMLInfo() {
220        std::ostringstream s;
221        vector<NodeID> nodes;
222        if (links.size()==0) {
223                s << "<h2 style=\"color=#606060\">No links established!</h2>";
224        } else {
225                s << "<h2 style=\"color=#606060\">Links</h2>";
226                s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
227                s << "<tr style=\"background-color=#ffe0e0\">";
228                s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
229                s << "</tr>";
230
231                int i=0;
232                BOOST_FOREACH( LinkDescriptor* ld, links ) {
233                        if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
234                        bool found = false;
235                        BOOST_FOREACH(NodeID& id, nodes)
236                        if (id  == ld->remoteNode) found = true;
237                        if (found) continue;
238                        i++;
239                        nodes.push_back(ld->remoteNode);
240                        if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
241                        else s << "<tr>";
242                        s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
243                        s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
244                        s << "<td>";
245                        if (ld->routeRecord.size()>1 && ld->relayed) {
246                                for (size_t i=1; i<ld->routeRecord.size(); i++)
247                                        s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
248                        } else {
249                                s << "Direct";
250                        }
251                        s << "</td>";
252                        s << "</tr>";
253                }
254                s << "</table>";
255        }
256        return s.str();
257}
258
259/// shows the current link state
260void BaseOverlay::showLinks() {
261        int i=0;
262        logging_info("--- link state -------------------------------");
263        BOOST_FOREACH( LinkDescriptor* ld, links ) {
264                string epd = "";
265                if (ld->isDirectVital())
266                        epd = getEndpointDescriptor(ld->remoteNode).toString();
267
268                logging_info("LINK_STATE: " << i << ": " << ld << " " << epd);
269                i++;
270        }
271        logging_info("----------------------------------------------");
272}
273
274/// compares two arbitrary links to the same node
275int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
276        LinkDescriptor* lhsld = getDescriptor(lhs);
277        LinkDescriptor* rhsld = getDescriptor(rhs);
278        if (lhsld==NULL || rhsld==NULL
279                        || !lhsld->up || !rhsld->up
280                        || lhsld->remoteNode != rhsld->remoteNode) return -1;
281
282        if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId)  )
283                return -1;
284
285        return 1;
286}
287
288
289// internal message delivery ---------------------------------------------------
290
291/// routes a message to its destination node
292void BaseOverlay::route( OverlayMsg* message ) {
293
294        // exceeded time-to-live? yes-> drop message
295        if (message->getNumHops() > message->getTimeToLive()) {
296                logging_warn("Message exceeded TTL. Dropping message and relay routes"
297                                "for recovery.");
298                removeRelayNode(message->getDestinationNode());
299                return;
300        }
301
302        // no-> forward message
303        else {
304                // destinastion myself? yes-> handle message
305                if (message->getDestinationNode() == nodeId) {
306                        logging_warn("Usually I should not route messages to myself!");
307                        Message msg;
308                        msg.encapsulate(message);
309                        handleMessage( &msg, NULL );
310                } else {
311                        // no->send message to next hop
312                        send( message, message->getDestinationNode() );
313                }
314        }
315}
316
317/// sends a message to another node, delivers it to the base overlay class
318seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
319        LinkDescriptor* next_link = NULL;
320
321        // drop messages to unspecified destinations
322        if (destination.isUnspecified()) return -1;
323
324        // send messages to myself -> handle message and drop warning!
325        if (destination == nodeId) {
326                logging_warn("Sent message to myself. Handling message.")
327                Message msg;
328                msg.encapsulate(message);
329                handleMessage( &msg, NULL );
330                return -1;
331        }
332
333        // use relay path?
334        if (message->isRelayed()) {
335                next_link = getRelayLinkTo( destination );
336                if (next_link != NULL) {
337                        next_link->setRelaying();
338                        return bc->sendMessage(next_link->communicationId, message);
339                } else {
340                        logging_warn("Could not send message. No relay hop found to "
341                                        << destination << " -- trying to route over overlay paths ...")
342//                      logging_error("ERROR: " << debugInformation() );
343                //                      return -1;
344                }
345        }
346
347        // last resort -> route over overlay path
348        LinkID next_id = overlayInterface->getNextLinkId( destination );
349        if (next_id.isUnspecified()) {
350                logging_warn("Could not send message. No next hop found to " <<
351                                destination );
352                logging_error("ERROR: " << debugInformation() );
353                return -1;
354        }
355
356        // get link descriptor, up and running? yes-> send message
357        next_link = getDescriptor(next_id);
358        if (next_link != NULL && next_link->up) {
359                // send message over relayed link
360                return send(message, next_link);
361        }
362
363        // no-> error, dropping message
364        else {
365                logging_warn("Could not send message. Link not known or up");
366                logging_error("ERROR: " << debugInformation() );
367                return -1;
368        }
369
370        // not reached-> fail
371        return -1;
372}
373
374/// send a message using a link descriptor, delivers it to the base overlay class
375seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
376        // check if null
377        if (ldr == NULL) {
378                logging_error("Can not send message to " << message->getDestinationAddress());
379                return -1;
380        }
381
382        // check if up
383        if (!ldr->up && !ignore_down) {
384                logging_error("Can not send message. Link not up:" << ldr );
385                logging_error("DEBUG_INFO: " << debugInformation() );
386                return -1;
387        }
388        LinkDescriptor* ld = NULL;
389
390        // handle relayed link
391        if (ldr->relayed) {
392                logging_debug("Resolving direct link for relayed link to "
393                                << ldr->remoteNode);
394                ld = getRelayLinkTo( ldr->remoteNode );
395                if (ld==NULL) {
396                        logging_error("No relay path found to link " << ldr );
397                        logging_error("DEBUG_INFO: " << debugInformation() );
398                        return -1;
399                }
400                ld->setRelaying();
401                message->setRelayed(true);
402        } else
403                ld = ldr;
404
405        // handle direct link
406        if (ld->communicationUp) {
407                logging_debug("send(): Sending message over direct link.");
408                return bc->sendMessage( ld->communicationId, message );
409        } else {
410                logging_error("send(): Could not send message. "
411                                "Not a relayed link and direct link is not up.");
412                return -1;
413        }
414        return -1;
415}
416
417seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
418                const ServiceID& service) {
419        message->setSourceNode(nodeId);
420        message->setDestinationNode(remote);
421        message->setService(service);
422        return send( message, remote );
423}
424
425seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
426        LinkDescriptor* ld = getDescriptor(link);
427        if (ld==NULL) {
428                logging_error("Cannot find descriptor to link id=" << link.toString());
429                return -1;
430        }
431        message->setSourceNode(nodeId);
432        message->setDestinationNode(ld->remoteNode);
433
434        message->setSourceLink(ld->overlayId);
435        message->setDestinationLink(ld->remoteLink);
436
437        message->setService(ld->service);
438        message->setRelayed(ld->relayed);
439        return send( message, ld, ignore_down );
440}
441
442// relay route management ------------------------------------------------------
443
444/// stabilize relay information
445void BaseOverlay::stabilizeRelays() {
446        vector<relay_route>::iterator i = relay_routes.begin();
447        while (i!=relay_routes.end() ) {
448                relay_route& route = *i;
449                LinkDescriptor* ld = getDescriptor(route.link);
450
451                // relay link still used and alive?
452                if (ld==NULL
453                                || !ld->isDirectVital()
454                                || difftime(route.used, time(NULL)) > 8) {
455                        logging_info("Forgetting relay information to node "
456                                        << route.node.toString() );
457                        i = relay_routes.erase(i);
458                } else
459                        i++;
460        }
461}
462
463void BaseOverlay::removeRelayLink( const LinkID& link ) {
464        vector<relay_route>::iterator i = relay_routes.begin();
465        while (i!=relay_routes.end() ) {
466                relay_route& route = *i;
467                if (route.link == link ) i = relay_routes.erase(i); else i++;
468        }
469}
470
471void BaseOverlay::removeRelayNode( const NodeID& remote ) {
472        vector<relay_route>::iterator i = relay_routes.begin();
473        while (i!=relay_routes.end() ) {
474                relay_route& route = *i;
475                if (route.node == remote ) i = relay_routes.erase(i); else i++;
476        }
477}
478
479/// refreshes relay information
480void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
481
482        // handle relayed messages from real links only
483        if (ld == NULL
484                        || ld->relayed
485                        || message->getSourceNode()==nodeId ) return;
486
487        // update usage information
488        if (message->isRelayed()) {
489                // try to find source node
490                BOOST_FOREACH( relay_route& route, relay_routes ) {
491                        // relay route found? yes->
492                        if ( route.node == message->getDestinationNode() ) {
493                                ld->setRelaying();
494                                route.used = time(NULL);
495                        }
496                }
497
498        }
499
500        // register relay path
501        if (message->isRegisterRelay()) {
502                // set relaying
503                ld->setRelaying();
504
505                // try to find source node
506                BOOST_FOREACH( relay_route& route, relay_routes ) {
507
508                        // relay route found? yes->
509                        if ( route.node == message->getSourceNode() ) {
510
511                                // refresh timer
512                                route.used = time(NULL);
513                                LinkDescriptor* rld = getDescriptor(route.link);
514
515                                // route has a shorter hop count or old link is dead? yes-> replace
516                                if (route.hops > message->getNumHops()
517                                                || rld == NULL
518                                                || !rld->isDirectVital()) {
519                                        logging_info("Updating relay information to node "
520                                                        << route.node.toString()
521                                                        << " reducing to " << message->getNumHops() << " hops.");
522                                        route.hops = message->getNumHops();
523                                        route.link = ld->overlayId;
524                                }
525                                return;
526                        }
527                }
528
529                // not found-> add new entry
530                relay_route route;
531                route.hops = message->getNumHops();
532                route.link = ld->overlayId;
533                route.node = message->getSourceNode();
534                route.used = time(NULL);
535                logging_info("Remembering relay information to node "
536                                << route.node.toString());
537                relay_routes.push_back(route);
538        }
539}
540
541/// returns a known "vital" relay link which is up and running
542LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
543        // try to find source node
544        BOOST_FOREACH( relay_route& route, relay_routes ) {
545                if (route.node == remote ) {
546                        LinkDescriptor* ld = getDescriptor( route.link );
547                        if (ld==NULL || !ld->isDirectVital()) return NULL; else {
548                                route.used = time(NULL);
549                                return ld;
550                        }
551                }
552        }
553        return NULL;
554}
555
556/* *****************************************************************************
557 * PUBLIC MEMBERS
558 * ****************************************************************************/
559
560use_logging_cpp(BaseOverlay);
561
562// ----------------------------------------------------------------------------
563
564BaseOverlay::BaseOverlay() :
565                        started(false),state(BaseOverlayStateInvalid),
566                        bc(NULL),
567                        nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED),
568                        sideport(&SideportListener::DEFAULT), overlayInterface(NULL),
569                        counter(0) {
570}
571
572BaseOverlay::~BaseOverlay() {
573}
574
575// ----------------------------------------------------------------------------
576
577void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
578        logging_info("Starting...");
579
580        // set parameters
581        bc = &_basecomm;
582        nodeId = _nodeid;
583
584        // register at base communication
585        bc->registerMessageReceiver( this );
586        bc->registerEventListener( this );
587
588        // timer for auto link management
589        Timer::setInterval( 1000 );
590        Timer::start();
591
592        started = true;
593        state = BaseOverlayStateInvalid;
594}
595
596void BaseOverlay::stop() {
597        logging_info("Stopping...");
598
599        // stop timer
600        Timer::stop();
601
602        // delete oberlay interface
603        if(overlayInterface != NULL) {
604                delete overlayInterface;
605                overlayInterface = NULL;
606        }
607
608        // unregister at base communication
609        bc->unregisterMessageReceiver( this );
610        bc->unregisterEventListener( this );
611
612        started = false;
613        state = BaseOverlayStateInvalid;
614}
615
616bool BaseOverlay::isStarted(){
617        return started;
618}
619
620// ----------------------------------------------------------------------------
621
622void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
623                const EndpointDescriptor& bootstrapEp) {
624
625        if(id != spovnetId){
626                logging_error("attempt to join against invalid spovnet, call initiate first");
627                return;
628        }
629
630        //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
631        logging_info( "Starting to join spovnet " << id.toString() <<
632                        " with nodeid " << nodeId.toString());
633
634        if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
635
636                //** FIRST STEP - MANDATORY */
637
638                // bootstrap against ourselfs
639                logging_info("joining spovnet locally");
640
641                overlayInterface->joinOverlay();
642                state = BaseOverlayStateCompleted;
643                BOOST_FOREACH( NodeListener* i, nodeListeners )
644                        i->onJoinCompleted( spovnetId );
645
646                //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
647                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
648
649        } else {
650
651                //** SECOND STEP - OPTIONAL */
652
653                // bootstrap against another node
654                logging_info("joining spovnet remotely against " << bootstrapEp.toString());
655
656                const LinkID& lnk = bc->establishLink( bootstrapEp );
657                bootstrapLinks.push_back(lnk);
658                logging_info("join process initiated for " << id.toString() << "...");
659        }
660}
661
662
663void BaseOverlay::startBootstrapModules(vector<pair<BootstrapManager::BootstrapType,string> > modules){
664        logging_debug("starting overlay bootstrap module");
665        overlayBootstrap.start(this, spovnetId, nodeId, modules);
666        overlayBootstrap.publish(bc->getEndpointDescriptor());
667}
668
669void BaseOverlay::stopBootstrapModules(){
670        logging_debug("stopping overlay bootstrap module");
671        overlayBootstrap.stop();
672        overlayBootstrap.revoke();
673}
674
675void BaseOverlay::leaveSpoVNet() {
676
677        logging_info( "Leaving spovnet " << spovnetId );
678        bool ret = ( state != this->BaseOverlayStateInvalid );
679
680        logging_debug( "Dropping all auto-links" );
681
682        // gather all service links
683        vector<LinkID> servicelinks;
684        BOOST_FOREACH( LinkDescriptor* ld, links ) {
685                if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
686                        servicelinks.push_back( ld->overlayId );
687        }
688
689        // drop all service links
690        BOOST_FOREACH( LinkID lnk, servicelinks )
691        dropLink( lnk );
692
693        // let the node leave the spovnet overlay interface
694        logging_debug( "Leaving overlay" );
695        if( overlayInterface != NULL )
696                overlayInterface->leaveOverlay();
697
698        // drop still open bootstrap links
699        BOOST_FOREACH( LinkID lnk, bootstrapLinks )
700        bc->dropLink( lnk );
701
702        // change to inalid state
703        state = BaseOverlayStateInvalid;
704        //ovl.visShutdown( ovlId, nodeId, string("") );
705
706        visualInstance.visShutdown(visualIdOverlay, nodeId, "");
707        visualInstance.visShutdown(visualIdBase, nodeId, "");
708
709        // inform all registered services of the event
710        BOOST_FOREACH( NodeListener* i, nodeListeners ) {
711                if( ret ) i->onLeaveCompleted( spovnetId );
712                else i->onLeaveFailed( spovnetId );
713        }
714}
715
716void BaseOverlay::createSpoVNet(const SpoVNetID& id,
717                const OverlayParameterSet& param,
718                const SecurityParameterSet& sec,
719                const QoSParameterSet& qos) {
720
721        // set the state that we are an initiator, this way incoming messages are
722        // handled correctly
723        logging_info( "creating spovnet " + id.toString() <<
724                        " with nodeid " << nodeId.toString() );
725
726        spovnetId = id;
727
728        overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
729        if( overlayInterface == NULL ) {
730                logging_fatal( "overlay structure not supported" );
731                state = BaseOverlayStateInvalid;
732
733                BOOST_FOREACH( NodeListener* i, nodeListeners )
734                i->onJoinFailed( spovnetId );
735
736                return;
737        }
738
739        visualInstance.visCreate(visualIdBase, nodeId, "", "");
740        visualInstance.visCreate(visualIdOverlay, nodeId, "", "");
741}
742
743// ----------------------------------------------------------------------------
744
745const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
746                const NodeID& remoteId, const ServiceID& service ) {
747
748        // establish link via overlay
749        if (!remoteId.isUnspecified())
750                return establishLink( remoteId, service );
751        else
752                return establishDirectLink(remoteEp, service );
753}
754
755/// call base communication's establish link and add link mapping
756const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
757                const ServiceID& service ) {
758
759        /// find a service listener
760        if( !communicationListeners.contains( service ) ) {
761                logging_error( "No listener registered for service id=" << service.toString() );
762                return LinkID::UNSPECIFIED;
763        }
764        CommunicationListener* listener = communicationListeners.get( service );
765        assert( listener != NULL );
766
767        // create descriptor
768        LinkDescriptor* ld = addDescriptor();
769        ld->relayed = false;
770        ld->listener = listener;
771        ld->service = service;
772        ld->communicationId = bc->establishLink( ep );
773
774        /// establish link and add mapping
775        logging_info("Establishing direct link " << ld->communicationId.toString()
776                        << " using " << ep.toString());
777
778        return ld->communicationId;
779}
780
781/// establishes a link between two arbitrary nodes
782const LinkID BaseOverlay::establishLink( const NodeID& remote,
783                const ServiceID& service ) {
784
785        // do not establish a link to myself!
786        if (remote == nodeId) return LinkID::UNSPECIFIED;
787
788        // create a link descriptor
789        LinkDescriptor* ld = addDescriptor();
790        ld->relayed = true;
791        ld->remoteNode = remote;
792        ld->service = service;
793        ld->listener = getListener(ld->service);
794
795        // create link request message
796        OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
797        msg.setSourceLink(ld->overlayId);
798
799        // send over relayed link
800        msg.setRelayed(true);
801        msg.setRegisterRelay(true);
802
803        // debug message
804        logging_info(
805                        "Sending link request with"
806                        << " link=" << ld->overlayId.toString()
807                        << " node=" << ld->remoteNode.toString()
808                        << " serv=" << ld->service.toString()
809        );
810
811        // sending message to node
812        send_node( &msg, ld->remoteNode, ld->service );
813
814        return ld->overlayId;
815}
816
817/// drops an established link
818void BaseOverlay::dropLink(const LinkID& link) {
819        logging_info( "Dropping link (initiated locally):" << link.toString() );
820
821        // find the link item to drop
822        LinkDescriptor* ld = getDescriptor(link);
823        if( ld == NULL ) {
824                logging_warn( "Can't drop link, link is unknown!");
825                return;
826        }
827
828        // delete all queued messages
829        if( ld->messageQueue.size() > 0 ) {
830                logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
831                                << ld->messageQueue.size() << " waiting messages" );
832                ld->flushQueue();
833        }
834
835        // inform sideport and listener
836        if(ld->listener != NULL)
837                ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
838        sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
839
840        // do not drop relay links
841        if (!ld->relaying) {
842                // drop the link in base communication
843                if (ld->communicationUp) bc->dropLink( ld->communicationId );
844
845                // erase descriptor
846                eraseDescriptor( ld->overlayId );
847        } else {
848                ld->dropAfterRelaying = true;
849        }
850}
851
852// ----------------------------------------------------------------------------
853
854/// internal send message, always use this functions to send messages over links
855seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
856        logging_debug( "Sending data message on link " << link.toString() );
857
858        // get the mapping for this link
859        LinkDescriptor* ld = getDescriptor(link);
860        if( ld == NULL ) {
861                logging_error("Could not send message. "
862                                << "Link not found id=" << link.toString());
863                return -1;
864        }
865
866        // check if the link is up yet, if its an auto link queue message
867        if( !ld->up ) {
868                ld->setAutoUsed();
869                if( ld->autolink ) {
870                        logging_info("Auto-link " << link.toString() << " not up, queue message");
871                        Data data = data_serialize( message );
872                        const_cast<Message*>(message)->dropPayload();
873                        ld->messageQueue.push_back( new Message(data) );
874                } else {
875                        logging_error("Link " << link.toString() << " not up, drop message");
876                }
877                return -1;
878        }
879
880        // compile overlay message (has service and node id)
881        OverlayMsg overmsg( OverlayMsg::typeData );
882        overmsg.encapsulate( const_cast<Message*>(message) );
883
884        // send message over relay/direct/overlay
885        return send_link( &overmsg, ld->overlayId );
886}
887
888
889seqnum_t BaseOverlay::sendMessage(const Message* message,
890                const NodeID& node, const ServiceID& service) {
891
892        // find link for node and service
893        LinkDescriptor* ld = getAutoDescriptor( node, service );
894
895        // if we found no link, create an auto link
896        if( ld == NULL ) {
897
898                // debug output
899                logging_info( "No link to send message to node "
900                                << node.toString() << " found for service "
901                                << service.toString() << ". Creating auto link ..."
902                );
903
904                // call base overlay to create a link
905                LinkID link = establishLink( node, service );
906                ld = getDescriptor( link );
907                if( ld == NULL ) {
908                        logging_error( "Failed to establish auto-link.");
909                        return -1;
910                }
911                ld->autolink = true;
912
913                logging_debug( "Auto-link establishment in progress to node "
914                                << node.toString() << " with link id=" << link.toString() );
915        }
916        assert(ld != NULL);
917
918        // mark the link as used, as we now send a message through it
919        ld->setAutoUsed();
920
921        // send / queue message
922        return sendMessage( message, ld->overlayId );
923}
924
925
926NodeID BaseOverlay::sendMessageCloserToNodeID(const Message* message,
927        const NodeID& address, const ServiceID& service) {
928   
929    if ( overlayInterface->isClosestNodeTo(address) )
930    {
931        return NodeID::UNSPECIFIED;
932    }
933       
934    const NodeID& closest_node = overlayInterface->getNextNodeId(address); 
935   
936    if ( closest_node != NodeID::UNSPECIFIED )
937    {
938        seqnum_t seqnum = sendMessage(message, closest_node, service);
939    }
940   
941    return closest_node;  // XXX return seqnum ?? tuple? closest_node via (non const) reference?
942}
943// ----------------------------------------------------------------------------
944
945const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
946                const LinkID& link) const {
947
948        // return own end-point descriptor
949        if( link.isUnspecified() )
950                return bc->getEndpointDescriptor();
951
952        // find link descriptor. not found -> return unspecified
953        const LinkDescriptor* ld = getDescriptor(link);
954        if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
955
956        // return endpoint-descriptor from base communication
957        return bc->getEndpointDescriptor( ld->communicationId );
958}
959
960const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
961                const NodeID& node) const {
962
963        // return own end-point descriptor
964        if( node == nodeId || node.isUnspecified() ) {
965                //logging_info("getEndpointDescriptor: returning self.");
966                return bc->getEndpointDescriptor();
967        }
968
969        // no joined and request remote descriptor? -> fail!
970        if( overlayInterface == NULL ) {
971                logging_error( "Overlay interface not set, cannot resolve end-point." );
972                return EndpointDescriptor::UNSPECIFIED();
973        }
974
975//      // resolve end-point descriptor from the base-overlay routing table
976//      const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
977//      if(ep.toString() != "") return ep;
978
979        // see if we can find the node in our own table
980        BOOST_FOREACH(const LinkDescriptor* ld, links){
981                if(ld->remoteNode != node) continue;
982                if(!ld->communicationUp) continue;
983                const EndpointDescriptor& ep =
984                                bc->getEndpointDescriptor(ld->communicationId);
985                if(ep != EndpointDescriptor::UNSPECIFIED()) {
986                        //logging_info("getEndpointDescriptor: using " << ld->to_string());
987                        return ep;
988                }
989        }
990
991        logging_warn( "No EndpointDescriptor found for node " << node );
992        logging_warn( const_cast<BaseOverlay*>(this)->debugInformation() );
993
994        return EndpointDescriptor::UNSPECIFIED();
995}
996
997// ----------------------------------------------------------------------------
998
999bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1000        sideport = _sideport;
1001        _sideport->configure( this );
1002        return true;
1003}
1004
1005bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1006        sideport = &SideportListener::DEFAULT;
1007        return true;
1008}
1009
1010// ----------------------------------------------------------------------------
1011
1012bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1013        logging_debug( "binding communication listener " << listener
1014                        << " on serviceid " << sid.toString() );
1015
1016        if( communicationListeners.contains( sid ) ) {
1017                logging_error( "some listener already registered for service id "
1018                                << sid.toString() );
1019                return false;
1020        }
1021
1022        communicationListeners.registerItem( listener, sid );
1023        return true;
1024}
1025
1026
1027bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1028        logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1029
1030        if( !communicationListeners.contains( sid ) ) {
1031                logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1032                return false;
1033        }
1034
1035        if( communicationListeners.get(sid) != listener ) {
1036                logging_warn( "listener bound to service id " << sid.toString()
1037                                << " is different than listener trying to unbind" );
1038                return false;
1039        }
1040
1041        communicationListeners.unregisterItem( sid );
1042        return true;
1043}
1044
1045// ----------------------------------------------------------------------------
1046
1047bool BaseOverlay::bind(NodeListener* listener) {
1048        logging_debug( "Binding node listener " << listener );
1049
1050        // already bound? yes-> warning
1051        NodeListenerVector::iterator i =
1052                        find( nodeListeners.begin(), nodeListeners.end(), listener );
1053        if( i != nodeListeners.end() ) {
1054                logging_warn("Node listener " << listener << " is already bound!" );
1055                return false;
1056        }
1057
1058        // no-> add
1059        nodeListeners.push_back( listener );
1060        return true;
1061}
1062
1063bool BaseOverlay::unbind(NodeListener* listener) {
1064        logging_debug( "Unbinding node listener " << listener );
1065
1066        // already unbound? yes-> warning
1067        NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1068        if( i == nodeListeners.end() ) {
1069                logging_warn( "Node listener " << listener << " is not bound!" );
1070                return false;
1071        }
1072
1073        // no-> remove
1074        nodeListeners.erase( i );
1075        return true;
1076}
1077
1078// ----------------------------------------------------------------------------
1079
1080void BaseOverlay::onLinkUp(const LinkID& id,
1081                const address_v* local, const address_v* remote) {
1082        logging_debug( "Link up with base communication link id=" << id );
1083
1084        // get descriptor for link
1085        LinkDescriptor* ld = getDescriptor(id, true);
1086
1087        // handle bootstrap link we initiated
1088        if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1089                logging_info(
1090                                "Join has been initiated by me and the link is now up. " <<
1091                                "Sending out join request for SpoVNet " << spovnetId.toString()
1092                );
1093
1094                // send join request message
1095                OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1096                                OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1097                JoinRequest joinRequest( spovnetId, nodeId );
1098                overlayMsg.encapsulate( &joinRequest );
1099                bc->sendMessage( id, &overlayMsg );
1100                return;
1101        }
1102
1103        // no link found? -> link establishment from remote, add one!
1104        if (ld == NULL) {
1105                ld = addDescriptor( id );
1106                logging_info( "onLinkUp (remote request) descriptor: " << ld );
1107
1108                // update descriptor
1109                ld->fromRemote = true;
1110                ld->communicationId = id;
1111                ld->communicationUp = true;
1112                ld->setAutoUsed();
1113                ld->setAlive();
1114
1115                // in this case, do not inform listener, since service it unknown
1116                // -> wait for update message!
1117
1118                // link mapping found? -> send update message with node-id and service id
1119        } else {
1120                logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1121
1122                // update descriptor
1123                ld->setAutoUsed();
1124                ld->setAlive();
1125                ld->communicationUp = true;
1126                ld->fromRemote = false;
1127
1128                // if link is a relayed link->convert to direct link
1129                if (ld->relayed) {
1130                        logging_info( "Converting to direct link: " << ld );
1131                        ld->up = true;
1132                        ld->relayed = false;
1133                        OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1134                        overMsg.setSourceLink( ld->overlayId );
1135                        overMsg.setDestinationLink( ld->remoteLink );
1136                        send_link( &overMsg, ld->overlayId );
1137                } else {
1138                        // note: necessary to validate the link on the remote side!
1139                        logging_info( "Sending out update" <<
1140                                        " for service " << ld->service.toString() <<
1141                                        " with local node id " << nodeId.toString() <<
1142                                        " on link " << ld->overlayId.toString() );
1143
1144                        // compile and send update message
1145                        OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1146                        overlayMsg.setSourceLink(ld->overlayId);
1147                        overlayMsg.setAutoLink( ld->autolink );
1148                        send_link( &overlayMsg, ld->overlayId, true );
1149                }
1150        }
1151}
1152
1153void BaseOverlay::onLinkDown(const LinkID& id,
1154                const address_v* local, const address_v* remote) {
1155
1156        // erase bootstrap links
1157        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1158        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1159
1160        // get descriptor for link
1161        LinkDescriptor* ld = getDescriptor(id, true);
1162        if ( ld == NULL ) return; // not found? ->ignore!
1163        logging_info( "onLinkDown descriptor: " << ld );
1164
1165        // removing relay link information
1166        removeRelayLink(ld->overlayId);
1167
1168        // inform listeners about link down
1169        ld->communicationUp = false;
1170        if (!ld->service.isUnspecified()) {
1171                CommunicationListener* lst = getListener(ld->service);
1172                if(lst != NULL) lst->onLinkDown( ld->overlayId, ld->remoteNode );
1173                sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1174        }
1175
1176        // delete all queued messages (auto links)
1177        if( ld->messageQueue.size() > 0 ) {
1178                logging_warn( "Dropping link " << id.toString() << " that has "
1179                                << ld->messageQueue.size() << " waiting messages" );
1180                ld->flushQueue();
1181        }
1182
1183        // erase mapping
1184        eraseDescriptor(ld->overlayId);
1185}
1186
1187void BaseOverlay::onLinkChanged(const LinkID& id,
1188                const address_v* oldlocal, const address_v* newlocal,
1189                const address_v* oldremote, const address_v* newremote) {
1190
1191        // get descriptor for link
1192        LinkDescriptor* ld = getDescriptor(id, true);
1193        if ( ld == NULL ) return; // not found? ->ignore!
1194        logging_debug( "onLinkChanged descriptor: " << ld );
1195
1196        // inform listeners
1197        ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1198        sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1199
1200        // autolinks: refresh timestamp
1201        ld->setAutoUsed();
1202}
1203
1204void BaseOverlay::onLinkFail(const LinkID& id,
1205                const address_v* local, const address_v* remote) {
1206        logging_debug( "Link fail with base communication link id=" << id );
1207
1208        // erase bootstrap links
1209        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1210        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1211
1212        // get descriptor for link
1213        LinkDescriptor* ld = getDescriptor(id, true);
1214        if ( ld == NULL ) return; // not found? ->ignore!
1215        logging_debug( "Link failed id=" << ld->overlayId.toString() );
1216
1217        // inform listeners
1218        ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1219        sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1220}
1221
1222void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1223                const address_v* remote, const QoSParameterSet& qos) {
1224        logging_debug( "Link quality changed with base communication link id=" << id );
1225
1226        // get descriptor for link
1227        LinkDescriptor* ld = getDescriptor(id, true);
1228        if ( ld == NULL ) return; // not found? ->ignore!
1229        logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1230}
1231
1232bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1233                const address_v* remote ) {
1234        logging_debug("Accepting link request from " << remote->to_string() );
1235        return true;
1236}
1237
1238/// handles a message from base communication
1239bool BaseOverlay::receiveMessage(const Message* message,
1240                const LinkID& link, const NodeID& ) {
1241        // get descriptor for link
1242        LinkDescriptor* ld = getDescriptor( link, true );
1243        return handleMessage( message, ld, link );
1244}
1245
1246// ----------------------------------------------------------------------------
1247
1248/// Handle spovnet instance join requests
1249bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1250
1251        // decapsulate message
1252        JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1253        logging_info( "Received join request for spovnet " <<
1254                        joinReq->getSpoVNetID().toString() );
1255
1256        // check spovnet id
1257        if( joinReq->getSpoVNetID() != spovnetId ) {
1258                logging_error(
1259                                "Received join request for spovnet we don't handle " <<
1260                                joinReq->getSpoVNetID().toString() );
1261                delete joinReq;
1262                return false;
1263        }
1264
1265        // TODO: here you can implement mechanisms to deny joining of a node
1266        bool allow = true;
1267        logging_info( "Sending join reply for spovnet " <<
1268                        spovnetId.toString() << " to node " <<
1269                        overlayMsg->getSourceNode().toString() <<
1270                        ". Result: " << (allow ? "allowed" : "denied") );
1271        joiningNodes.push_back( overlayMsg->getSourceNode() );
1272
1273        // return overlay parameters
1274        assert( overlayInterface != NULL );
1275        logging_debug( "Using bootstrap end-point "
1276                        << getEndpointDescriptor().toString() )
1277        OverlayParameterSet parameters = overlayInterface->getParameters();
1278        OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1279                        OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1280        JoinReply replyMsg( spovnetId, parameters,
1281                        allow, getEndpointDescriptor() );
1282        retmsg.encapsulate(&replyMsg);
1283        bc->sendMessage( bcLink, &retmsg );
1284
1285        delete joinReq;
1286        return true;
1287}
1288
1289/// Handle replies to spovnet instance join requests
1290bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1291        // decapsulate message
1292        logging_debug("received join reply message");
1293        JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1294
1295        // correct spovnet?
1296        if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1297                logging_error( "Received SpoVNet join reply for " <<
1298                                replyMsg->getSpoVNetID().toString() <<
1299                                " != " << spovnetId.toString() );
1300                delete replyMsg;
1301                return false;
1302        }
1303
1304        // access granted? no -> fail
1305        if( !replyMsg->getJoinAllowed() ) {
1306                logging_error( "Our join request has been denied" );
1307
1308                // drop initiator link
1309                if( !bcLink.isUnspecified() ){
1310                        bc->dropLink( bcLink );
1311
1312                        vector<LinkID>::iterator it = std::find(
1313                                        bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1314                        if( it != bootstrapLinks.end() )
1315                                bootstrapLinks.erase(it);
1316                }
1317
1318                // inform all registered services of the event
1319                BOOST_FOREACH( NodeListener* i, nodeListeners )
1320                i->onJoinFailed( spovnetId );
1321
1322                delete replyMsg;
1323                return true;
1324        }
1325
1326        // access has been granted -> continue!
1327        logging_info("Join request has been accepted for spovnet " <<
1328                        spovnetId.toString() );
1329
1330        logging_debug( "Using bootstrap end-point "
1331                        << replyMsg->getBootstrapEndpoint().toString() );
1332
1333        // create overlay structure from spovnet parameter set
1334        // if we have not boostrapped yet against some other node
1335        if( overlayInterface == NULL ){
1336
1337                logging_debug("first-time bootstrapping");
1338
1339                overlayInterface = OverlayFactory::create(
1340                                *this, replyMsg->getParam(), nodeId, this );
1341
1342                // overlay structure supported? no-> fail!
1343                if( overlayInterface == NULL ) {
1344                        logging_error( "overlay structure not supported" );
1345
1346                        if( !bcLink.isUnspecified() ){
1347                                bc->dropLink( bcLink );
1348
1349                                vector<LinkID>::iterator it = std::find(
1350                                                bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1351                                if( it != bootstrapLinks.end() )
1352                                        bootstrapLinks.erase(it);
1353                        }
1354
1355                        // inform all registered services of the event
1356                        BOOST_FOREACH( NodeListener* i, nodeListeners )
1357                        i->onJoinFailed( spovnetId );
1358
1359                        delete replyMsg;
1360                        return true;
1361                }
1362
1363                // everything ok-> join the overlay!
1364                state = BaseOverlayStateCompleted;
1365                overlayInterface->createOverlay();
1366
1367                overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1368                overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1369
1370                // update ovlvis
1371                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1372
1373                // inform all registered services of the event
1374                BOOST_FOREACH( NodeListener* i, nodeListeners )
1375                i->onJoinCompleted( spovnetId );
1376
1377                delete replyMsg;
1378
1379        } else {
1380
1381                // this is not the first bootstrap, just join the additional node
1382                logging_debug("not first-time bootstrapping");
1383                overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1384                overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1385
1386                delete replyMsg;
1387
1388        } // if( overlayInterface == NULL )
1389
1390        return true;
1391}
1392
1393
1394bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1395        // get service
1396        const ServiceID& service = overlayMsg->getService();
1397        logging_debug( "Received data for service " << service.toString()
1398                        << " on link " << overlayMsg->getDestinationLink().toString() );
1399
1400        // delegate data message
1401        CommunicationListener* lst = getListener(service);
1402        if(lst != NULL){
1403                lst->onMessage(
1404                                overlayMsg,
1405                                overlayMsg->getSourceNode(),
1406                                overlayMsg->getDestinationLink()
1407                );
1408        }
1409
1410        return true;
1411}
1412
1413
1414bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1415
1416        if( ld == NULL ) {
1417                logging_warn( "received overlay update message for link for "
1418                                << "which we have no mapping" );
1419                return false;
1420        }
1421        logging_info("Received type update message on link " << ld );
1422
1423        // update our link mapping information for this link
1424        bool changed =
1425                        ( ld->remoteNode != overlayMsg->getSourceNode() )
1426                        || ( ld->service != overlayMsg->getService() );
1427
1428        // set parameters
1429        ld->up         = true;
1430        ld->remoteNode = overlayMsg->getSourceNode();
1431        ld->remoteLink = overlayMsg->getSourceLink();
1432        ld->service    = overlayMsg->getService();
1433        ld->autolink   = overlayMsg->isAutoLink();
1434
1435        // if our link information changed, we send out an update, too
1436        if( changed ) {
1437                overlayMsg->swapRoles();
1438                overlayMsg->setSourceNode(nodeId);
1439                overlayMsg->setSourceLink(ld->overlayId);
1440                overlayMsg->setService(ld->service);
1441                send( overlayMsg, ld );
1442        }
1443
1444        // service registered? no-> error!
1445        if( !communicationListeners.contains( ld->service ) ) {
1446                logging_warn( "Link up: event listener has not been registered" );
1447                return false;
1448        }
1449
1450        // default or no service registered?
1451        CommunicationListener* listener = communicationListeners.get( ld->service );
1452        if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1453                logging_warn("Link up: event listener is default or null!" );
1454                return true;
1455        }
1456
1457        // update descriptor
1458        ld->listener = listener;
1459        ld->setAutoUsed();
1460        ld->setAlive();
1461
1462        // ask the service whether it wants to accept this link
1463        if( !listener->onLinkRequest(ld->remoteNode) ) {
1464
1465                logging_debug("Link id=" << ld->overlayId.toString() <<
1466                                " has been denied by service " << ld->service.toString() << ", dropping link");
1467
1468                // prevent onLinkDown calls to the service
1469                ld->listener = &CommunicationListener::DEFAULT;
1470
1471                // drop the link
1472                dropLink( ld->overlayId );
1473                return true;
1474        }
1475
1476        // set link up
1477        ld->up = true;
1478        logging_info( "Link has been accepted by service and is up: " << ld );
1479
1480        // auto links: link has been accepted -> send queued messages
1481        if( ld->messageQueue.size() > 0 ) {
1482                logging_info( "Sending out queued messages on link " << ld );
1483                BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1484                        sendMessage( msg, ld->overlayId );
1485                        delete msg;
1486                }
1487                ld->messageQueue.clear();
1488        }
1489
1490        // call the notification functions
1491        listener->onLinkUp( ld->overlayId, ld->remoteNode );
1492        sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1493
1494        return true;
1495}
1496
1497/// handle a link request and reply
1498bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1499        logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1500
1501        //TODO: Check if a request has already been sent using getSourceLink() ...
1502
1503        // create link descriptor
1504        LinkDescriptor* ldn = addDescriptor();
1505
1506        // flags
1507        ldn->up = true;
1508        ldn->fromRemote = true;
1509        ldn->relayed = true;
1510
1511        // parameters
1512        ldn->service = overlayMsg->getService();
1513        ldn->listener = getListener(ldn->service);
1514        ldn->remoteNode = overlayMsg->getSourceNode();
1515        ldn->remoteLink = overlayMsg->getSourceLink();
1516
1517        // update time-stamps
1518        ldn->setAlive();
1519        ldn->setAutoUsed();
1520
1521        // create reply message and send back!
1522        overlayMsg->swapRoles(); // swap source/destination
1523        overlayMsg->setType(OverlayMsg::typeLinkReply);
1524        overlayMsg->setSourceLink(ldn->overlayId);
1525        overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1526        overlayMsg->setRelayed(true);
1527        send( overlayMsg, ld ); // send back to link
1528
1529        // inform listener
1530        if(ldn != NULL && ldn->listener != NULL)
1531                ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1532
1533        return true;
1534}
1535
1536bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1537
1538        // find link request
1539        LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1540
1541        // not found? yes-> drop with error!
1542        if (ldn == NULL) {
1543                logging_error( "No link request pending for "
1544                                << overlayMsg->getDestinationLink().toString() );
1545                return false;
1546        }
1547        logging_debug("Handling link reply for " << ldn )
1548
1549        // check if already up
1550        if (ldn->up) {
1551                logging_warn( "Link already up: " << ldn );
1552                return true;
1553        }
1554
1555        // debug message
1556        logging_debug( "Link request reply received. Establishing link"
1557                        << " for service " << overlayMsg->getService().toString()
1558                        << " with local id=" << overlayMsg->getDestinationLink()
1559                        << " and remote link id=" << overlayMsg->getSourceLink()
1560                        << " to " << overlayMsg->getSourceEndpoint().toString()
1561        );
1562
1563        // set local link descriptor data
1564        ldn->up = true;
1565        ldn->relayed = true;
1566        ldn->service = overlayMsg->getService();
1567        ldn->listener = getListener(ldn->service);
1568        ldn->remoteLink = overlayMsg->getSourceLink();
1569        ldn->remoteNode = overlayMsg->getSourceNode();
1570
1571        // update timestamps
1572        ldn->setAlive();
1573        ldn->setAutoUsed();
1574
1575        // auto links: link has been accepted -> send queued messages
1576        if( ldn->messageQueue.size() > 0 ) {
1577                logging_info( "Sending out queued messages on link " <<
1578                                ldn->overlayId.toString() );
1579                BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1580                        sendMessage( msg, ldn->overlayId );
1581                        delete msg;
1582                }
1583                ldn->messageQueue.clear();
1584        }
1585
1586        // inform listeners about new link
1587        ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1588
1589        // try to replace relay link with direct link
1590        ldn->retryCounter = 3;
1591        ldn->endpoint = overlayMsg->getSourceEndpoint();
1592        ldn->communicationId =  bc->establishLink( ldn->endpoint );
1593
1594        return true;
1595}
1596
1597/// handle a keep-alive message for a link
1598bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1599        LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1600        if ( rld != NULL ) {
1601                logging_debug("Keep-Alive for " <<
1602                                overlayMsg->getDestinationLink() );
1603                if (overlayMsg->isRouteRecord())
1604                        rld->routeRecord = overlayMsg->getRouteRecord();
1605                rld->setAlive();
1606                return true;
1607        } else {
1608                logging_error("Keep-Alive for "
1609                                << overlayMsg->getDestinationLink() << ": link unknown." );
1610                return false;
1611        }
1612}
1613
1614/// handle a direct link message
1615bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1616        logging_debug( "Received direct link replacement request" );
1617
1618        /// get destination overlay link
1619        LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1620        if (rld == NULL || ld == NULL) {
1621                logging_error("Direct link replacement: Link "
1622                                << overlayMsg->getDestinationLink() << "not found error." );
1623                return false;
1624        }
1625        logging_info( "Received direct link convert notification for " << rld );
1626
1627        // update information
1628        rld->communicationId = ld->communicationId;
1629        rld->communicationUp = true;
1630        rld->relayed = false;
1631
1632        // mark used and alive!
1633        rld->setAlive();
1634        rld->setAutoUsed();
1635
1636        // erase the original descriptor
1637        eraseDescriptor(ld->overlayId);
1638        return true;
1639}
1640
1641/// handles an incoming message
1642bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1643                const LinkID bcLink ) {
1644        logging_debug( "Handling message: " << message->toString());
1645
1646        // decapsulate overlay message
1647        OverlayMsg* overlayMsg =
1648                        const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1649        if( overlayMsg == NULL ) return false;
1650
1651        // increase number of hops
1652        overlayMsg->increaseNumHops();
1653
1654        // refresh relay information
1655        refreshRelayInformation( overlayMsg, ld );
1656
1657        // update route record
1658        overlayMsg->addRouteRecord(nodeId);
1659
1660        // handle signaling messages (do not route!)
1661        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1662                        overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1663                overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1664                delete overlayMsg;
1665                return true;
1666        }
1667
1668        // message for reached destination? no-> route message
1669        if (!overlayMsg->getDestinationNode().isUnspecified() &&
1670                        overlayMsg->getDestinationNode() != nodeId ) {
1671                logging_debug("Routing message "
1672                                << " from " << overlayMsg->getSourceNode()
1673                                << " to " << overlayMsg->getDestinationNode()
1674                );
1675                route( overlayMsg );
1676                delete overlayMsg;
1677                return true;
1678        }
1679
1680        // handle base overlay message
1681        bool ret = false; // return value
1682        switch ( overlayMsg->getType() ) {
1683
1684        // data transport messages
1685        case OverlayMsg::typeData:
1686                ret = handleData(overlayMsg, ld);                       break;
1687
1688                // overlay setup messages
1689        case OverlayMsg::typeJoinRequest:
1690                ret = handleJoinRequest(overlayMsg, bcLink );   break;
1691        case OverlayMsg::typeJoinReply:
1692                ret = handleJoinReply(overlayMsg, bcLink );     break;
1693
1694                // link specific messages
1695        case OverlayMsg::typeLinkRequest:
1696                ret = handleLinkRequest(overlayMsg, ld );       break;
1697        case OverlayMsg::typeLinkReply:
1698                ret = handleLinkReply(overlayMsg, ld );         break;
1699        case OverlayMsg::typeLinkUpdate:
1700                ret = handleLinkUpdate(overlayMsg, ld );        break;
1701        case OverlayMsg::typeLinkAlive:
1702                ret = handleLinkAlive(overlayMsg, ld );         break;
1703        case OverlayMsg::typeLinkDirect:
1704                ret = handleLinkDirect(overlayMsg, ld );        break;
1705
1706                // handle unknown message type
1707        default: {
1708                logging_error( "received message in invalid state! don't know " <<
1709                                "what to do with this message of type " << overlayMsg->getType() );
1710                ret = false;
1711                break;
1712        }
1713        }
1714
1715        // free overlay message and return value
1716        delete overlayMsg;
1717        return ret;
1718}
1719
1720// ----------------------------------------------------------------------------
1721
1722void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1723
1724        logging_debug( "broadcasting message to all known nodes " <<
1725                        "in the overlay from service " + service.toString() );
1726
1727        if(message == NULL) return;
1728        message->setReleasePayload(false);
1729
1730        OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1731        for(size_t i=0; i<nodes.size(); i++){
1732                NodeID& id = nodes.at(i);
1733                if(id == this->nodeId) continue; // don't send to ourselfs
1734                if(i+1 == nodes.size()) message->setReleasePayload(true); // release payload on last send
1735                sendMessage( message, id, service );
1736        }
1737}
1738
1739/// return the overlay neighbors
1740vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1741        // the known nodes _can_ also include our node, so we remove ourself
1742        vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1743        vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1744        if( i != nodes.end() ) nodes.erase( i );
1745        return nodes;
1746}
1747
1748const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1749        if( lid == LinkID::UNSPECIFIED ) return nodeId;
1750        const LinkDescriptor* ld = getDescriptor(lid);
1751        if( ld == NULL ) return NodeID::UNSPECIFIED;
1752        else return ld->remoteNode;
1753}
1754
1755vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1756        vector<LinkID> linkvector;
1757        BOOST_FOREACH( LinkDescriptor* ld, links ) {
1758                if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1759                        linkvector.push_back( ld->overlayId );
1760                }
1761        }
1762        return linkvector;
1763}
1764
1765
1766void BaseOverlay::onNodeJoin(const NodeID& node) {
1767        JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1768        if( i == joiningNodes.end() ) return;
1769
1770        logging_info( "node has successfully joined baseoverlay and overlay structure "
1771                        << node.toString() );
1772
1773        joiningNodes.erase( i );
1774}
1775
1776void BaseOverlay::eventFunction() {
1777        stabilizeRelays();
1778        stabilizeLinks();
1779        updateVisual();
1780}
1781
1782void BaseOverlay::updateVisual(){
1783
1784        //
1785        // update base overlay structure
1786        //
1787
1788        static NodeID pre = NodeID::UNSPECIFIED;
1789        static NodeID suc = NodeID::UNSPECIFIED;
1790
1791        vector<NodeID> nodes = this->getOverlayNeighbors(false);
1792
1793        if(nodes.size() == 0){
1794
1795                if(pre != NodeID::UNSPECIFIED){
1796                        visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
1797                        pre = NodeID::UNSPECIFIED;
1798                }
1799                if(suc != NodeID::UNSPECIFIED){
1800                        visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
1801                        suc = NodeID::UNSPECIFIED;
1802                }
1803
1804        } // if(nodes.size() == 0)
1805
1806        if(nodes.size() == 1){
1807                // only one node, make this pre and succ
1808                // and then go into the node.size()==2 case
1809                //nodes.push_back(nodes.at(0));
1810
1811                if(pre != nodes.at(0)){
1812                        pre = nodes.at(0);
1813                        if(pre != NodeID::UNSPECIFIED)
1814                                visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
1815                }
1816        }
1817
1818        if(nodes.size() == 2){
1819
1820                // old finger
1821                if(nodes.at(0) != pre){
1822                        if(pre != NodeID::UNSPECIFIED)
1823                                visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
1824                        pre = NodeID::UNSPECIFIED;
1825                }
1826                if(nodes.at(1) != suc){
1827                        if(suc != NodeID::UNSPECIFIED)
1828                                visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
1829                        suc = NodeID::UNSPECIFIED;
1830                }
1831
1832                // connect with fingers
1833                if(pre == NodeID::UNSPECIFIED){
1834                        pre = nodes.at(0);
1835                        if(pre != NodeID::UNSPECIFIED)
1836                                visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
1837                }
1838                if(suc == NodeID::UNSPECIFIED){
1839                        suc = nodes.at(1);
1840                        if(suc != NodeID::UNSPECIFIED)
1841                                visualInstance.visConnect(visualIdOverlay, this->nodeId, suc, "");
1842                }
1843
1844        } //if(nodes.size() == 2)
1845
1846//      {
1847//              logging_error("================================");
1848//              logging_error("my nodeid " << nodeId.get(MAX_KEYLENGTH-16, 16));
1849//              logging_error("================================");
1850//              if(nodes.size()>= 1){
1851//                      logging_error("real pre " << nodes.at(0).toString());
1852//                      logging_error("real pre " << nodes.at(0).get(MAX_KEYLENGTH-16, 16));
1853//              }
1854//              if(nodes.size()>= 2){
1855//                      logging_error("real suc " << nodes.at(1).toString());
1856//                      logging_error("real suc " << nodes.at(1).get(MAX_KEYLENGTH-16, 16));
1857//              }
1858//              logging_error("================================");
1859//              if(pre == NodeID::UNSPECIFIED){
1860//                      logging_error("pre: unspecified");
1861//              }else{
1862//                      unsigned int prei = pre.get(MAX_KEYLENGTH-16, 16);
1863//                      logging_error("pre: " << prei);
1864//              }
1865//              if(suc == NodeID::UNSPECIFIED){
1866//                      logging_error("suc: unspecified");
1867//              }else{
1868//                      unsigned int suci = suc.get(MAX_KEYLENGTH-16, 16);
1869//                      logging_error("suc: " << suci);
1870//              }
1871//              logging_error("================================");
1872//      }
1873
1874        //
1875        // update base communication links
1876        //
1877
1878        static set<NodeID> linkset;
1879        set<NodeID> remotenodes;
1880        BOOST_FOREACH( LinkDescriptor* ld, links ) {
1881                if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
1882                        continue;
1883
1884                if (ld->routeRecord.size()>1 && ld->relayed) {
1885                        for (size_t i=1; i<ld->routeRecord.size(); i++)
1886                                remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] );
1887                } else {
1888                        remotenodes.insert(ld->remoteNode);
1889                }
1890        }
1891
1892        // which links are old and need deletion?
1893        bool changed = false;
1894
1895        do{
1896                changed = false;
1897                BOOST_FOREACH(NodeID n, linkset){
1898                        if(remotenodes.find(n) == remotenodes.end()){
1899                                visualInstance.visDisconnect(visualIdBase, this->nodeId, n, "");
1900                                linkset.erase(n);
1901                                changed = true;
1902                                break;
1903                        }
1904                }
1905        }while(changed);
1906
1907        // which links are new and need creation?
1908        do{
1909                changed = false;
1910                BOOST_FOREACH(NodeID n, remotenodes){
1911                        if(linkset.find(n) == linkset.end()){
1912                                visualInstance.visConnect(visualIdBase, this->nodeId, n, "");
1913                                linkset.insert(n);
1914                                changed = true;
1915                                break;
1916                        }
1917                }
1918        }while(changed);
1919
1920}
1921
1922// ----------------------------------------------------------------------------
1923
1924std::string BaseOverlay::debugInformation() {
1925        std::stringstream s;
1926        int i=0;
1927
1928        // dump overlay information
1929        s << "Long debug info ... [see below]" << endl << endl;
1930        s << "--- overlay information ----------------------" << endl;
1931        s << overlayInterface->debugInformation() << endl;
1932
1933        // dump link state
1934        s << "--- link state -------------------------------" << endl;
1935        BOOST_FOREACH( LinkDescriptor* ld, links ) {
1936                s << "link " << i << ": " << ld << endl;
1937                i++;
1938        }
1939        s << endl << endl;
1940
1941        return s.str();
1942}
1943
1944}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.