An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/overlay/BaseOverlay.cpp @ 12438

Last change on this file since 12438 was 12438, checked in by hock@…, 10 years ago

new callbacks in ariba::NodeListener?:

  • onOverlayConnected
  • onOverlayDisconnected
File size: 79.7 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/visual/OvlVis.h"
57#include "ariba/utility/visual/DddVis.h"
58#include "ariba/utility/visual/ServerVis.h"
59#include <ariba/utility/misc/sha1.h>
60
61namespace ariba {
62namespace overlay {
63
64using namespace std;
65using ariba::transport::system_priority;
66
67#define visualInstance          ariba::utility::DddVis::instance()
68#define visualIdOverlay         ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
69#define visualIdBase            ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
70
71
72// time constants (in seconds)
73#define KEEP_ALIVE_TIME         60                      // send keep-alive message after link is not used for #s 
74
75#define LINK_ESTABLISH_TIME_OUT 10                      // timeout: link requested but not up
76#define KEEP_ALIVE_TIME_OUT     KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT     // timeout: no data received on this link (incl. keep-alive messages)
77#define AUTO_LINK_TIME_OUT      KEEP_ALIVE_TIME_OUT                    // timeout: auto link not used for #s
78
79
80// ----------------------------------------------------------------------------
81
82/* *****************************************************************************
83 * PREREQUESITES
84 * ****************************************************************************/
85
86CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
87        if( !communicationListeners.contains( service ) ) {
88                logging_info( "No listener found for service " << service.toString() );
89                return NULL;
90        }
91        CommunicationListener* listener = communicationListeners.get( service );
92        assert( listener != NULL );
93        return listener;
94}
95
96// link descriptor handling ----------------------------------------------------
97
98LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
99        foreach( LinkDescriptor* lp, links )
100                                if ((communication ? lp->communicationId : lp->overlayId) == link)
101                                        return lp;
102        return NULL;
103}
104
105const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
106        foreach( const LinkDescriptor* lp, links )
107                                if ((communication ? lp->communicationId : lp->overlayId) == link)
108                                        return lp;
109        return NULL;
110}
111
112/// erases a link descriptor
113void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
114        for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
115                LinkDescriptor* ld = *i;
116                if ((communication ? ld->communicationId : ld->overlayId) == link) {
117                        delete ld;
118                        links.erase(i);
119                        break;
120                }
121        }
122}
123
124/// adds a link descriptor
125LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
126        LinkDescriptor* desc = getDescriptor( link );
127        if ( desc == NULL ) {
128                desc = new LinkDescriptor();
129                if (!link.isUnspecified()) desc->overlayId = link;
130                links.push_back(desc);
131        }
132        return desc;
133}
134
135/// returns a auto-link descriptor
136LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service )
137{
138        // search for a descriptor that is already up
139        foreach( LinkDescriptor* lp, links )
140    {
141        if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) )
142            return lp;
143    }
144       
145        // if not found, search for one that is about to come up...
146        foreach( LinkDescriptor* lp, links )
147        {
148            time_t now = time(NULL);
149           
150        if (lp->autolink && lp->remoteNode == node && lp->service == service
151                && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT )
152            return lp;
153        }
154       
155        return NULL;
156}
157
158/// stabilizes link information
159void BaseOverlay::stabilizeLinks() {
160    time_t now = time(NULL);
161   
162    // send keep-alive messages over established links
163        foreach( LinkDescriptor* ld, links )
164    {
165                if (!ld->up) continue;
166               
167                if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME )
168                {
169                    logging_debug("[BaseOverlay] Sending KeepAlive over "
170                            << ld->to_string()
171                            << " after "
172                            << difftime( now, ld->keepAliveSent )
173                            << "s");
174                   
175            OverlayMsg msg( OverlayMsg::typeKeepAlive,
176                    OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
177            msg.setRouteRecord(true);
178            ld->keepAliveSent = now;
179            send_link( &msg, ld->overlayId, system_priority::OVERLAY );
180                }
181        }
182
183        // iterate over all links and check for time boundaries
184        vector<LinkDescriptor*> oldlinks;
185        foreach( LinkDescriptor* ld, links ) {
186
187                // link connection request stale?
188                if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT )  // NOTE: keepAliveReceived == now, on connection request
189                {
190            logging_info( "Link connection request is stale, closing: " << ld );
191            ld->failed = true;
192            oldlinks.push_back( ld );
193            continue;
194                }
195
196                if (!ld->up) continue;
197
198               
199               
200               
201                // check if link is relayed and retry connecting directly
202                // TODO Mario: What happens here?  --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply
203                if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) {
204                        ld->retryCounter--;
205                        ld->communicationId = bc->establishLink( ld->endpoint );
206                }
207
208                // remote used as relay flag
209                if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT)  // TODO is this a reasonable timeout ??
210                        ld->relaying = false;
211
212                // drop links that are dropped and not used as relay
213                if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
214                        oldlinks.push_back( ld );
215                        continue;
216                }
217
218                // auto-link time exceeded?
219                if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) {
220                        oldlinks.push_back( ld );
221                        continue;
222                }
223
224                // keep alives missed? yes->
225                if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT )
226                {
227            logging_info( "Link is stale, closing: " << ld );
228            ld->failed = true;
229            oldlinks.push_back( ld );
230            continue;
231                }
232        }
233
234        // drop links
235        foreach( LinkDescriptor* ld, oldlinks ) {
236                logging_info( "Link timed out. Dropping " << ld );
237                ld->relaying = false;
238                dropLink( ld->overlayId );
239        }
240
241       
242       
243       
244        // show link state  (debug output)
245        if (counter>=10 || counter<0)
246    {
247            showLinks();
248            counter = 0;
249    }
250        else
251        {
252            counter++;
253        }
254}
255
256
257std::string BaseOverlay::getLinkHTMLInfo() {
258        std::ostringstream s;
259        vector<NodeID> nodes;
260        if (links.size()==0) {
261                s << "<h2 style=\"color=#606060\">No links established!</h2>";
262        } else {
263                s << "<h2 style=\"color=#606060\">Links</h2>";
264                s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
265                s << "<tr style=\"background-color=#ffe0e0\">";
266                s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
267                s << "</tr>";
268
269                int i=0;
270                foreach( LinkDescriptor* ld, links ) {
271                        if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
272                        bool found = false;
273                        foreach(NodeID& id, nodes)
274                        if (id  == ld->remoteNode) found = true;
275                        if (found) continue;
276                        i++;
277                        nodes.push_back(ld->remoteNode);
278                        if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
279                        else s << "<tr>";
280                        s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
281                        s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
282                        s << "<td>";
283                        if (ld->routeRecord.size()>1 && ld->relayed) {
284                                for (size_t i=1; i<ld->routeRecord.size(); i++)
285                                        s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
286                        } else {
287                                s << "Direct";
288                        }
289                        s << "</td>";
290                        s << "</tr>";
291                }
292                s << "</table>";
293        }
294        return s.str();
295}
296
297/// shows the current link state
298void BaseOverlay::showLinks() {
299        int i=0;
300        logging_info("--- link state -------------------------------");
301        foreach( LinkDescriptor* ld, links ) {
302                string epd = "";
303                if (isLinkDirectVital(ld))
304                {
305//                      epd = getEndpointDescriptor(ld->remoteNode).toString();
306                   
307                    epd = "Connection: ";
308                    epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string();
309                    epd += " <---> ";
310                    epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string();
311                }
312
313                logging_info("LINK_STATE: " << i << ": " << ld << " " << epd);
314                i++;
315        }
316        logging_info("----------------------------------------------");
317}
318
319/// compares two arbitrary links to the same node
320int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
321        LinkDescriptor* lhsld = getDescriptor(lhs);
322        LinkDescriptor* rhsld = getDescriptor(rhs);
323        if (lhsld==NULL || rhsld==NULL
324                        || !lhsld->up || !rhsld->up
325                        || lhsld->remoteNode != rhsld->remoteNode) return -1;
326
327        if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId)  )
328                return -1;
329
330        return 1;
331}
332
333
334// internal message delivery ---------------------------------------------------
335
336
337seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority )
338{
339    // set priority
340    message->setPriority(priority);
341   
342    // wrap old OverlayMsg into reboost message
343    reboost::message_t wrapped_message = message->wrap_up_for_sending();
344   
345    // send down to BaseCommunication
346    try
347    {
348        // * send *
349        return bc->sendMessage(bc_link, wrapped_message, priority, false);
350    }
351    catch ( communication::communication_message_not_sent& e )
352    {
353        ostringstream out;
354        out << "Communication message not sent: " << e.what();
355        throw message_not_sent(out.str());
356    }
357   
358    throw logic_error("This should never happen!");
359}
360
361
362/// routes a message to its destination node
363void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop )
364{
365        // exceeded time-to-live? yes-> drop message
366        if (message->getNumHops() > message->getTimeToLive())
367        {
368                logging_warn("Message exceeded TTL. Dropping message and relay routes "
369            << "for recovery. Hop count: " << (int) message->getNumHops());
370                removeRelayNode(message->getDestinationNode());
371                return;
372        }
373
374        // no-> forward message
375        else
376        {
377                // destinastion myself? yes-> handle message
378                if (message->getDestinationNode() == nodeId)
379                {
380                        logging_warn("Usually I should not route messages to myself. And I won't!");
381                }
382
383                // no->send message to next hop
384                else
385                {
386                    try
387                    {
388                /* (deep) packet inspection to determine priority */
389                // BRANCH: typeData  -->  send with low priority
390                if ( message->getType() == OverlayMsg::typeData )
391                {
392                    // TODO think about implementing explicit routing queue (with active queue management??)
393                    send( message,
394                          message->getDestinationNode(),
395                          message->getPriority(),
396                          last_hop );
397                }
398                // BRANCH: internal message  -->  send with higher priority
399                else
400                {
401                    send( message,
402                          message->getDestinationNode(),
403                          system_priority::HIGH,
404                          last_hop );
405                }
406                    }
407                    catch ( message_not_sent& e )
408                    {
409                        logging_warn("Unable to route message of type "
410                                << message->getType()
411                                << " to "
412                                << message->getDestinationNode()
413                                << ". Reason: "
414                                << e.what());
415                       
416                        // inform sender
417                if ( message->getType() != OverlayMsg::typeMessageLost )
418                {
419                    report_lost_message(message);
420                }
421                    }
422                }
423        }
424}
425
426void BaseOverlay::report_lost_message( const OverlayMsg* message )
427{
428    OverlayMsg reply(OverlayMsg::typeMessageLost);
429    reply.setSeqNum(message->getSeqNum());
430   
431    /**
432     * MessageLost-Message
433     *
434     * - Type of lost message
435     * - Hop count of lost message
436     * - Source-LinkID  of lost message
437     */
438    reboost::shared_buffer_t b(sizeof(uint8_t)*2);
439    b.mutable_data()[0] = message->getType();
440    b.mutable_data()[1] = message->getNumHops();
441    reply.append_buffer(b);
442    reply.append_buffer(message->getSourceLink().serialize());
443   
444    try
445    {
446        send_node(&reply, message->getSourceNode(),
447                system_priority::OVERLAY,
448                OverlayInterface::OVERLAY_SERVICE_ID);
449    }
450    catch ( message_not_sent& e )
451    {
452        logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too.");
453    }
454}
455
456/// sends a message to another node, delivers it to the base overlay class
457seqnum_t BaseOverlay::send( OverlayMsg* message,
458        const NodeID& destination,
459        uint8_t priority,
460        const NodeID& last_hop ) throw(message_not_sent)
461{
462        LinkDescriptor* next_link = NULL;
463
464        // drop messages to unspecified destinations
465        if (destination.isUnspecified())
466            throw message_not_sent("No destination specified. Drop!");
467
468        // send messages to myself -> drop!
469    // TODO maybe this is not what we want. why not just deliver this message?
470    //   There is a similar check in the route function, there it should be okay.
471        if (destination == nodeId)
472        {
473            logging_warn("Sent message to myself. Drop!");
474           
475            throw message_not_sent("Sent message to myself. Drop!");
476        }
477
478        // use relay path?
479        if (message->isRelayed())
480        {
481                next_link = getRelayLinkTo( destination );
482               
483                if (next_link != NULL)
484                {
485                        next_link->setRelaying();
486
487                        // * send message over relayed link *
488                        return send_overlaymessage_down(message, next_link->communicationId, priority);
489                }
490                else
491                {
492                        logging_warn("No relay hop found to " << destination
493                                << " -- trying to route over overlay paths ...")
494                }
495        }
496
497       
498        // last resort -> route over overlay path
499        LinkID next_id = overlayInterface->getNextLinkId( destination );
500        if ( next_id.isUnspecified() )
501        {               
502        // apperently we're the closest node --> try second best node
503        //   NOTE: This is helpful if our routing table is not up-to-date, but
504        //   may lead to circles. So we have to be careful.
505        std::vector<const LinkID*> next_ids = 
506            overlayInterface->getSortedLinkIdsTowardsNode( destination );
507           
508        for ( int i = 0; i < next_ids.size(); i++ )
509        {
510            const LinkID& link = *next_ids[i];
511           
512            if ( ! link.isUnspecified() )
513            {
514                next_id = link;
515               
516                break;
517            }
518        }
519     
520        // still no next hop found. drop.
521        if ( next_id.isUnspecified() )
522        {
523            logging_warn("Could not send message. No next hop found to " <<
524                destination );
525            logging_error("ERROR: " << debugInformation() );
526           
527            throw message_not_sent("No next hop found.");
528        }
529        }
530
531       
532        /* get link descriptor, do some checks and send message */
533        next_link = getDescriptor(next_id);
534   
535    // check pointer
536    if ( next_link == NULL )
537    {
538        // NOTE: this shuldn't happen
539        throw message_not_sent("Could not send message. Link not known.");
540    }
541   
542    // avoid circles
543    if ( next_link->remoteNode == last_hop )
544    {
545        // XXX logging_debug
546        logging_info("Possible next hop would create a circle: "
547            << next_link->remoteNode);
548       
549        throw message_not_sent("Could not send message. Possible next hop would create a circle.");
550    }
551   
552    // check if link is up
553        if ( ! next_link->up)
554        {
555        logging_warn("Could not send message. Link not up");
556        logging_error("ERROR: " << debugInformation() );
557       
558        throw message_not_sent("Could not send message. Link not up");
559        }
560
561        // * send message over overlay link *
562        return send(message, next_link, priority);
563}
564
565
566/// send a message using a link descriptor, delivers it to the base overlay class
567seqnum_t BaseOverlay::send( OverlayMsg* message,
568        LinkDescriptor* ldr,
569        uint8_t priority ) throw(message_not_sent)
570{
571        // check if null
572        if (ldr == NULL)
573        {
574        ostringstream out;
575        out << "Can not send message to " << message->getDestinationAddress();
576        throw message_not_sent(out.str());
577        }
578
579        // check if up
580        if ( !ldr->up )
581        {
582                logging_error("DEBUG_INFO: " << debugInformation() );
583
584        ostringstream out;
585        out << "Can not send message. Link not up:" << ldr->to_string();
586        throw message_not_sent(out.str());
587        }
588       
589        LinkDescriptor* next_hop_ld = NULL;
590
591        // BRANCH: relayed link
592        if (ldr->relayed)
593        {
594                logging_debug("Resolving direct link for relayed link to "
595                                << ldr->remoteNode);
596               
597                next_hop_ld = getRelayLinkTo( ldr->remoteNode );
598               
599                if (next_hop_ld==NULL)
600                {
601                        logging_error("DEBUG_INFO: " << debugInformation() );
602                       
603                ostringstream out;
604                out << "No relay path found to link: " << ldr;
605                throw message_not_sent(out.str());
606                }
607               
608                next_hop_ld->setRelaying();
609                message->setRelayed(true);
610        }
611        // BRANCH: direct link
612        else
613        {
614                next_hop_ld = ldr;
615        }
616
617       
618        // check next hop-link
619        if ( ! next_hop_ld->communicationUp)
620        {
621            throw message_not_sent( "send(): Could not send message."
622                    " Not a relayed link and direct link is not up." );
623        }
624
625        // send over next link
626    logging_debug("send(): Sending message over direct link.");
627    return send_overlaymessage_down(message, next_hop_ld->communicationId, priority);
628
629}
630
631seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
632        uint8_t priority, const ServiceID& service) throw(message_not_sent)
633{
634        message->setSourceNode(nodeId);
635        message->setDestinationNode(remote);
636        message->setService(service);
637        return send( message, remote, priority );
638}
639
640void BaseOverlay::send_link( OverlayMsg* message,
641        const LinkID& link,
642        uint8_t priority ) throw(message_not_sent)
643{
644        LinkDescriptor* ld = getDescriptor(link);
645        if (ld==NULL)
646        {
647            throw message_not_sent("Cannot find descriptor to link id=" + link.toString());
648        }
649       
650        message->setSourceNode(nodeId);
651        message->setDestinationNode(ld->remoteNode);
652
653        message->setSourceLink(ld->overlayId);
654        message->setDestinationLink(ld->remoteLink);
655
656        message->setService(ld->service);
657        message->setRelayed(ld->relayed);
658   
659   
660    try
661    {
662        // * send message *
663        send( message, ld, priority );
664    }
665    catch ( message_not_sent& e )
666    {
667        // drop failed link
668        ld->failed = true;
669        dropLink(ld->overlayId);
670    }
671}
672
673// relay route management ------------------------------------------------------
674
675/// stabilize relay information
676void BaseOverlay::stabilizeRelays() {
677        vector<relay_route>::iterator i = relay_routes.begin();
678        while (i!=relay_routes.end() ) {
679                relay_route& route = *i;
680                LinkDescriptor* ld = getDescriptor(route.link);
681
682                // relay link still used and alive?
683                if (ld==NULL
684                                || !isLinkDirectVital(ld)
685                                || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT)  // TODO this was set to 8 before.. Is the new timeout better?
686                {
687                        logging_info("Forgetting relay information to node "
688                                        << route.node.toString() );
689                        i = relay_routes.erase(i);
690                } else
691                        i++;
692        }
693}
694
695void BaseOverlay::removeRelayLink( const LinkID& link ) {
696        vector<relay_route>::iterator i = relay_routes.begin();
697        while (i!=relay_routes.end() ) {
698                relay_route& route = *i;
699                if (route.link == link ) i = relay_routes.erase(i); else i++;
700        }
701}
702
703void BaseOverlay::removeRelayNode( const NodeID& remote ) {
704        vector<relay_route>::iterator i = relay_routes.begin();
705        while (i!=relay_routes.end() ) {
706                relay_route& route = *i;
707                if (route.node == remote ) i = relay_routes.erase(i); else i++;
708        }
709}
710
711/// refreshes relay information
712void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
713
714        // handle relayed messages from real links only
715        if (ld == NULL
716                        || ld->relayed
717                        || message->getSourceNode()==nodeId ) return;
718
719        // update usage information
720        if (message->isRelayed()) {
721                // try to find source node
722                foreach( relay_route& route, relay_routes ) {
723                        // relay route found? yes->
724                        if ( route.node == message->getDestinationNode() ) {
725                                ld->setRelaying();
726                                route.used = time(NULL);
727                        }
728                }
729
730        }
731
732        // register relay path
733        if (message->isRegisterRelay()) {
734                // set relaying
735                ld->setRelaying();
736
737                // try to find source node
738                foreach( relay_route& route, relay_routes ) {
739
740                        // relay route found? yes->
741                        if ( route.node == message->getSourceNode() ) {
742
743                                // refresh timer
744                                route.used = time(NULL);
745                                LinkDescriptor* rld = getDescriptor(route.link);
746
747                                // route has a shorter hop count or old link is dead? yes-> replace
748                                if (route.hops > message->getNumHops()
749                                                || rld == NULL
750                                                || !isLinkDirectVital(ld)) {
751                                        logging_info("Updating relay information to node "
752                                                        << route.node.toString()
753                                                        << " reducing to " << (int) message->getNumHops() << " hops.");
754                                        route.hops = message->getNumHops();
755                                        route.link = ld->overlayId;
756                                }
757                                return;
758                        }
759                }
760
761                // not found-> add new entry
762                relay_route route;
763                route.hops = message->getNumHops();
764                route.link = ld->overlayId;
765                route.node = message->getSourceNode();
766                route.used = time(NULL);
767                logging_info("Remembering relay information to node "
768                                << route.node.toString());
769                relay_routes.push_back(route);
770        }
771}
772
773/// returns a known "vital" relay link which is up and running
774LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
775        // try to find source node
776        foreach( relay_route& route, relay_routes ) {
777                if (route.node == remote ) {
778                        LinkDescriptor* ld = getDescriptor( route.link );
779                        if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else {
780                                route.used = time(NULL);
781                                return ld;
782                        }
783                }
784        }
785        return NULL;
786}
787
788/* *****************************************************************************
789 * PUBLIC MEMBERS
790 * ****************************************************************************/
791
792use_logging_cpp(BaseOverlay);
793
794// ----------------------------------------------------------------------------
795
796BaseOverlay::BaseOverlay() :
797                        started(false),
798                        connected(false),
799                        state(BaseOverlayStateInvalid),
800                        bc(NULL),
801                        nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED),
802                        sideport(&SideportListener::DEFAULT), overlayInterface(NULL),
803                        counter(0) {
804}
805
806BaseOverlay::~BaseOverlay() {
807}
808
809// ----------------------------------------------------------------------------
810
811void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) {
812        logging_info("Starting...");
813
814        // set parameters
815        bc = _basecomm;
816        nodeId = _nodeid;
817
818        // register at base communication
819        bc->registerMessageReceiver( this );
820        bc->registerEventListener( this );
821
822        // timer for auto link management
823        Timer::setInterval( 1000 ); // XXX
824//      Timer::setInterval( 10000 );
825        Timer::start();
826
827        started = true;
828        state = BaseOverlayStateInvalid;
829}
830
831void BaseOverlay::stop() {
832        logging_info("Stopping...");
833
834        // stop timer
835        Timer::stop();
836
837        // delete oberlay interface
838        if(overlayInterface != NULL) {
839                delete overlayInterface;
840                overlayInterface = NULL;
841        }
842
843        // unregister at base communication
844        bc->unregisterMessageReceiver( this );
845        bc->unregisterEventListener( this );
846
847        started = false;
848        state = BaseOverlayStateInvalid;
849}
850
851bool BaseOverlay::isStarted(){
852        return started;
853}
854
855// ----------------------------------------------------------------------------
856
857void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
858                const EndpointDescriptor& bootstrapEp) {
859
860        if(id != spovnetId){
861                logging_error("attempt to join against invalid spovnet, call initiate first");
862                return;
863        }
864
865        //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
866        logging_info( "Starting to join spovnet " << id.toString() <<
867                        " with nodeid " << nodeId.toString());
868
869        if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
870
871                //** FIRST STEP - MANDATORY */
872
873                // bootstrap against ourselfs
874                logging_info("joining spovnet locally");
875
876                overlayInterface->joinOverlay();
877                state = BaseOverlayStateCompleted;
878                foreach( NodeListener* i, nodeListeners )
879                        i->onJoinCompleted( spovnetId );
880
881                //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
882                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
883
884        } else {
885
886                //** SECOND STEP - OPTIONAL */
887
888                // bootstrap against another node
889                logging_info("joining spovnet remotely against " << bootstrapEp.toString());
890
891                const LinkID& lnk = bc->establishLink( bootstrapEp );
892                bootstrapLinks.push_back(lnk);
893                logging_info("join process initiated for " << id.toString() << "...");
894        }
895}
896
897
898void BaseOverlay::startBootstrapModules(vector<pair<BootstrapManager::BootstrapType,string> > modules){
899        logging_debug("starting overlay bootstrap module");
900        overlayBootstrap.start(this, spovnetId, nodeId, modules);
901        overlayBootstrap.publish(bc->getEndpointDescriptor());
902}
903
904void BaseOverlay::stopBootstrapModules(){
905        logging_debug("stopping overlay bootstrap module");
906        overlayBootstrap.stop();
907        overlayBootstrap.revoke();
908}
909
910void BaseOverlay::leaveSpoVNet() {
911
912        logging_info( "Leaving spovnet " << spovnetId );
913        bool ret = ( state != this->BaseOverlayStateInvalid );
914
915        logging_debug( "Dropping all auto-links" );
916
917        // gather all service links
918        vector<LinkID> servicelinks;
919        foreach( LinkDescriptor* ld, links )
920        {
921                if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
922                        servicelinks.push_back( ld->overlayId );
923        }
924
925        // drop all service links
926        foreach( LinkID lnk, servicelinks )
927        {
928            logging_debug("Dropping service link " << lnk.toString());
929            dropLink( lnk );
930        }
931
932        // let the node leave the spovnet overlay interface
933        logging_debug( "Leaving overlay" );
934        if( overlayInterface != NULL )
935        {
936                overlayInterface->leaveOverlay();
937        }
938
939        // drop still open bootstrap links
940        foreach( LinkID lnk, bootstrapLinks )
941        {
942            logging_debug("Dropping bootstrap link " << lnk.toString());
943            bc->dropLink( lnk );
944        }
945
946        // change to inalid state
947        state = BaseOverlayStateInvalid;
948        //ovl.visShutdown( ovlId, nodeId, string("") );
949
950        visualInstance.visShutdown(visualIdOverlay, nodeId, "");
951        visualInstance.visShutdown(visualIdBase, nodeId, "");
952
953        // inform all registered services of the event
954        foreach( NodeListener* i, nodeListeners )
955        {
956                if( ret ) i->onLeaveCompleted( spovnetId );
957                else i->onLeaveFailed( spovnetId );
958        }
959}
960
961void BaseOverlay::createSpoVNet(const SpoVNetID& id,
962                const OverlayParameterSet& param,
963                const SecurityParameterSet& sec,
964                const QoSParameterSet& qos) {
965
966        // set the state that we are an initiator, this way incoming messages are
967        // handled correctly
968        logging_info( "creating spovnet " + id.toString() <<
969                        " with nodeid " << nodeId.toString() );
970
971        spovnetId = id;
972
973        overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
974        if( overlayInterface == NULL ) {
975                logging_fatal( "overlay structure not supported" );
976                state = BaseOverlayStateInvalid;
977
978                foreach( NodeListener* i, nodeListeners )
979                i->onJoinFailed( spovnetId );
980
981                return;
982        }
983
984        visualInstance.visCreate(visualIdBase, nodeId, "", "");
985        visualInstance.visCreate(visualIdOverlay, nodeId, "", "");
986}
987
988// ----------------------------------------------------------------------------
989
990const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
991                const NodeID& remoteId, const ServiceID& service ) {
992
993        // establish link via overlay
994        if (!remoteId.isUnspecified())
995                return establishLink( remoteId, service );
996        else
997                return establishDirectLink(remoteEp, service );
998}
999
1000/// call base communication's establish link and add link mapping
1001const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
1002                const ServiceID& service ) {
1003
1004        /// find a service listener
1005        if( !communicationListeners.contains( service ) ) {
1006                logging_error( "No listener registered for service id=" << service.toString() );
1007                return LinkID::UNSPECIFIED;
1008        }
1009        CommunicationListener* listener = communicationListeners.get( service );
1010        assert( listener != NULL );
1011
1012        // create descriptor
1013        LinkDescriptor* ld = addDescriptor();
1014        ld->relayed = false;
1015        ld->listener = listener;
1016        ld->service = service;
1017        ld->communicationId = bc->establishLink( ep );
1018
1019        /// establish link and add mapping
1020        logging_info("Establishing direct link " << ld->communicationId.toString()
1021                        << " using " << ep.toString());
1022
1023        return ld->communicationId;
1024}
1025
1026/// establishes a link between two arbitrary nodes
1027const LinkID BaseOverlay::establishLink( const NodeID& remote,
1028                const ServiceID& service ) {
1029
1030    // TODO What if we already have a Link to this node and this service id?
1031   
1032        // do not establish a link to myself!
1033        if (remote == nodeId) return 
1034                LinkID::UNSPECIFIED;
1035
1036       
1037        // create a link descriptor
1038        LinkDescriptor* ld = addDescriptor();
1039        ld->relayed = true;
1040        ld->remoteNode = remote;
1041        ld->service = service;
1042        ld->listener = getListener(ld->service);
1043   
1044    // initialize sequence numbers
1045    ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
1046    logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum);
1047
1048        // create link request message
1049        OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
1050        msg.setSourceLink(ld->overlayId);
1051
1052        // send over relayed link
1053        msg.setRelayed(true);
1054        msg.setRegisterRelay(true);
1055//      msg.setRouteRecord(true);
1056   
1057    msg.setSeqNum(ld->last_sent_seqnum);
1058
1059        // debug message
1060        logging_info(
1061                        "Sending link request with"
1062                        << " link=" << ld->overlayId.toString()
1063                        << " node=" << ld->remoteNode.toString()
1064                        << " serv=" << ld->service.toString()
1065        );
1066
1067       
1068        // sending message to node
1069        try
1070        {
1071            // * send *
1072            seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service );
1073        }
1074        catch ( message_not_sent& e )
1075        {
1076            logging_warn("Link request not sent: " << e.what());
1077           
1078            // Message not sent. Cancel link request.
1079            SystemQueue::instance().scheduleCall(
1080                    boost::bind(
1081                            &BaseOverlay::__onLinkEstablishmentFailed,
1082                            this,
1083                            ld->overlayId)
1084                );
1085        }
1086       
1087        return ld->overlayId;
1088}
1089
1090/// NOTE: "id" is an Overlay-LinkID
1091void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id)
1092{
1093    // TODO This code redundant. But also it's not easy to aggregate in one function.
1094   
1095    // get descriptor for link
1096    LinkDescriptor* ld = getDescriptor(id, false);
1097    if ( ld == NULL ) return; // not found? ->ignore!
1098
1099    logging_debug( "__onLinkEstablishmentFaild: " << ld );
1100
1101    // removing relay link information
1102    removeRelayLink(ld->overlayId);
1103
1104    // inform listeners about link down
1105    ld->communicationUp = false;
1106    if (!ld->service.isUnspecified())
1107    {
1108        CommunicationListener* lst = getListener(ld->service);
1109        if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode );
1110        sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1111    }
1112
1113    // delete all queued messages (auto links)
1114    if( ld->messageQueue.size() > 0 ) {
1115        logging_warn( "Dropping link " << id.toString() << " that has "
1116                << ld->messageQueue.size() << " waiting messages" );
1117        ld->flushQueue();
1118    }
1119
1120    // erase mapping
1121    eraseDescriptor(ld->overlayId);
1122}
1123
1124
1125/// drops an established link
1126void BaseOverlay::dropLink(const LinkID& link)
1127{
1128        logging_info( "Dropping link: " << link.toString() );
1129
1130        // find the link item to drop
1131        LinkDescriptor* ld = getDescriptor(link);
1132        if( ld == NULL )
1133        {
1134                logging_warn( "Can't drop link, link is unknown!");
1135                return;
1136        }
1137
1138        // delete all queued messages
1139        if( ld->messageQueue.size() > 0 )
1140        {
1141                logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
1142                                << ld->messageQueue.size() << " waiting messages" );
1143                ld->flushQueue();
1144        }
1145       
1146           
1147        // inform application and remote note (but only once)
1148        //   NOTE: If we initiated the drop, this function is called twice, but on
1149        //   the second call, there is noting to do.
1150        if ( ld->up && ! ld->failed )
1151        {
1152        // inform sideport and listener
1153        if(ld->listener != NULL)
1154        {
1155            ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
1156        }
1157        sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
1158       
1159        // send link-close to remote node
1160        logging_info("Sending LinkClose message to remote node.");
1161        OverlayMsg close_msg(OverlayMsg::typeLinkClose);
1162        send_link(&close_msg, link, system_priority::OVERLAY);
1163   
1164        // deactivate link
1165        ld->up = false;
1166//         ld->closing = true;
1167        }
1168       
1169        else if ( ld->failed )
1170    {
1171        // inform listener
1172        if( ld->listener != NULL )
1173        {
1174            ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1175        }
1176       
1177        ld->up = false;
1178        __removeDroppedLink(ld->overlayId);
1179    }
1180}
1181
1182/// called from typeLinkClose-handler
1183void BaseOverlay::__removeDroppedLink(const LinkID& link)
1184{
1185    // find the link item to drop
1186    LinkDescriptor* ld = getDescriptor(link);
1187    if( ld == NULL )
1188    {
1189        return;
1190    }
1191
1192    // do not drop relay links
1193    if (!ld->relaying)
1194    {
1195        // drop the link in base communication
1196        if (ld->communicationUp)
1197        {
1198            bc->dropLink( ld->communicationId );
1199        }
1200
1201        // erase descriptor
1202        eraseDescriptor( ld->overlayId );
1203    }
1204    else
1205    {
1206        ld->dropAfterRelaying = true;
1207    }
1208}
1209
1210// ----------------------------------------------------------------------------
1211
1212/// internal send message, always use this functions to send messages over links
1213const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message,
1214        const LinkID& link,
1215        uint8_t priority ) throw(message_not_sent)
1216{
1217        logging_debug( "Sending data message on link " << link.toString() );
1218
1219        // get the mapping for this link
1220        LinkDescriptor* ld = getDescriptor(link);
1221        if( ld == NULL )
1222        {
1223            throw message_not_sent("Could not send message. Link not found id=" + link.toString());
1224        }
1225
1226        // check if the link is up yet, if its an auto link queue message
1227        if( !ld->up )
1228        {
1229                ld->setAutoUsed();
1230                if( ld->autolink )
1231                {
1232                        logging_info("Auto-link " << link.toString() << " not up, queue message");
1233                       
1234                        // queue message
1235                LinkDescriptor::message_queue_entry msg;
1236                msg.message = message;
1237                msg.priority = priority;
1238
1239                        ld->messageQueue.push_back( msg );
1240                       
1241                        return SequenceNumber::DISABLED;  // TODO what to return if message is queued?
1242                }
1243                else
1244                {
1245                    throw message_not_sent("Link " + link.toString() + " not up, drop message");
1246                }
1247        }
1248       
1249        // TODO XXX ----> coordinate with QUIC-efforts !!
1250        // TODO aktuell: sequence numbers
1251        // TODO seqnum on fast path ?
1252        ld->last_sent_seqnum.increment();
1253       
1254        /* choose fast-path for direct links; normal overlay-path otherwise */
1255        // BRANCH: direct link
1256        if ( ld->communicationUp && !ld->relayed )
1257        {
1258            // * send down to BaseCommunication *
1259            try
1260            {
1261                bc->sendMessage(ld->communicationId, message, priority, true);
1262        }
1263        catch ( communication::communication_message_not_sent& e )
1264        {
1265            ostringstream out;
1266            out << "Communication message on fast-path not sent: " << e.what();
1267            throw message_not_sent(out.str());
1268        }
1269        }
1270
1271        // BRANCH: use (slow) overlay-path
1272        else
1273        {
1274        // compile overlay message (has service and node id)
1275        OverlayMsg overmsg( OverlayMsg::typeData );
1276        overmsg.set_payload_message(message);
1277       
1278        // set SeqNum
1279        if ( ld->transmit_seqnums )
1280        {
1281            overmsg.setSeqNum(ld->last_sent_seqnum);
1282        }
1283        logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum());
1284   
1285        // send message over relay/direct/overlay
1286        send_link( &overmsg, ld->overlayId, priority );
1287        }
1288       
1289        // return seqnum
1290        return ld->last_sent_seqnum;
1291}
1292
1293
1294const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message,
1295                const NodeID& node, uint8_t priority, const ServiceID& service) {
1296
1297        // find link for node and service
1298        LinkDescriptor* ld = getAutoDescriptor( node, service );
1299
1300        // if we found no link, create an auto link
1301        if( ld == NULL ) {
1302
1303                // debug output
1304                logging_info( "No link to send message to node "
1305                                << node.toString() << " found for service "
1306                                << service.toString() << ". Creating auto link ..."
1307                );
1308
1309                // call base overlay to create a link
1310                LinkID link = establishLink( node, service );
1311                ld = getDescriptor( link );
1312                if( ld == NULL ) {
1313                        logging_error( "Failed to establish auto-link.");
1314            throw message_not_sent("Failed to establish auto-link.");
1315                }
1316                ld->autolink = true;
1317
1318                logging_debug( "Auto-link establishment in progress to node "
1319                                << node.toString() << " with link id=" << link.toString() );
1320        }
1321        assert(ld != NULL);
1322
1323        // mark the link as used, as we now send a message through it
1324        ld->setAutoUsed();
1325
1326        // send / queue message
1327        return sendMessage( message, ld->overlayId, priority );
1328}
1329
1330
1331NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message,
1332        const NodeID& address, uint8_t priority, const ServiceID& service) {
1333   
1334    if ( overlayInterface->isClosestNodeTo(address) )
1335    {
1336        return NodeID::UNSPECIFIED;
1337    }
1338       
1339    const NodeID& closest_node = overlayInterface->getNextNodeId(address); 
1340   
1341    if ( closest_node != NodeID::UNSPECIFIED )
1342    {
1343        sendMessage(message, closest_node, priority, service);
1344    }
1345   
1346    return closest_node;  // return seqnum ?? tuple? closest_node via (non const) reference?
1347}
1348// ----------------------------------------------------------------------------
1349
1350const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1351                const LinkID& link) const {
1352
1353        // return own end-point descriptor
1354        if( link.isUnspecified() )
1355                return bc->getEndpointDescriptor();
1356
1357        // find link descriptor. not found -> return unspecified
1358        const LinkDescriptor* ld = getDescriptor(link);
1359        if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
1360
1361        // return endpoint-descriptor from base communication
1362        return bc->getEndpointDescriptor( ld->communicationId );
1363}
1364
1365const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1366                const NodeID& node) const {
1367
1368        // return own end-point descriptor
1369        if( node == nodeId || node.isUnspecified() ) {
1370                //logging_info("getEndpointDescriptor: returning self.");
1371                return bc->getEndpointDescriptor();
1372        }
1373
1374        // no joined and request remote descriptor? -> fail!
1375        if( overlayInterface == NULL ) {
1376                logging_error( "Overlay interface not set, cannot resolve end-point." );
1377                return EndpointDescriptor::UNSPECIFIED();
1378        }
1379
1380//      // resolve end-point descriptor from the base-overlay routing table
1381//      const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
1382//      if(ep.toString() != "") return ep;
1383
1384        // see if we can find the node in our own table
1385        foreach(const LinkDescriptor* ld, links){
1386                if(ld->remoteNode != node) continue;
1387                if(!ld->communicationUp) continue;
1388                const EndpointDescriptor& ep =
1389                                bc->getEndpointDescriptor(ld->communicationId);
1390                if(ep != EndpointDescriptor::UNSPECIFIED()) {
1391                        //logging_info("getEndpointDescriptor: using " << ld->to_string());
1392                        return ep;
1393                }
1394        }
1395
1396        logging_warn( "No EndpointDescriptor found for node " << node );
1397        logging_warn( const_cast<BaseOverlay*>(this)->debugInformation() );
1398
1399        return EndpointDescriptor::UNSPECIFIED();
1400}
1401
1402// ----------------------------------------------------------------------------
1403
1404bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1405        sideport = _sideport;
1406        _sideport->configure( this );
1407        return true;
1408}
1409
1410bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1411        sideport = &SideportListener::DEFAULT;
1412        return true;
1413}
1414
1415// ----------------------------------------------------------------------------
1416
1417bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1418        logging_debug( "binding communication listener " << listener
1419                        << " on serviceid " << sid.toString() );
1420
1421        if( communicationListeners.contains( sid ) ) {
1422                logging_error( "some listener already registered for service id "
1423                                << sid.toString() );
1424                return false;
1425        }
1426
1427        communicationListeners.registerItem( listener, sid );
1428        return true;
1429}
1430
1431
1432bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1433        logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1434
1435        if( !communicationListeners.contains( sid ) ) {
1436                logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1437                return false;
1438        }
1439
1440        if( communicationListeners.get(sid) != listener ) {
1441                logging_warn( "listener bound to service id " << sid.toString()
1442                                << " is different than listener trying to unbind" );
1443                return false;
1444        }
1445
1446        communicationListeners.unregisterItem( sid );
1447        return true;
1448}
1449
1450// ----------------------------------------------------------------------------
1451
1452bool BaseOverlay::bind(NodeListener* listener) {
1453        logging_debug( "Binding node listener " << listener );
1454
1455        // already bound? yes-> warning
1456        NodeListenerVector::iterator i =
1457                        find( nodeListeners.begin(), nodeListeners.end(), listener );
1458        if( i != nodeListeners.end() ) {
1459                logging_warn("Node listener " << listener << " is already bound!" );
1460                return false;
1461        }
1462
1463        // no-> add
1464        nodeListeners.push_back( listener );
1465        return true;
1466}
1467
1468bool BaseOverlay::unbind(NodeListener* listener) {
1469        logging_debug( "Unbinding node listener " << listener );
1470
1471        // already unbound? yes-> warning
1472        NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1473        if( i == nodeListeners.end() ) {
1474                logging_warn( "Node listener " << listener << " is not bound!" );
1475                return false;
1476        }
1477
1478        // no-> remove
1479        nodeListeners.erase( i );
1480        return true;
1481}
1482
1483// ----------------------------------------------------------------------------
1484
1485void BaseOverlay::onLinkUp(const LinkID& id,
1486        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
1487{
1488        logging_debug( "Link up with base communication link id=" << id );
1489
1490        // get descriptor for link
1491        LinkDescriptor* ld = getDescriptor(id, true);
1492
1493        // BRANCH: handle bootstrap link we initiated
1494        if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1495                logging_info(
1496                                "Join has been initiated by me and the link is now up. " <<
1497                                "LinkID: " << id.toString() <<
1498                                "Sending out join request for SpoVNet " << spovnetId.toString()
1499                );
1500
1501                // send join request message
1502                OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1503                                OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1504                JoinRequest joinRequest( spovnetId, nodeId );
1505                overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer());
1506
1507                send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
1508               
1509                return;
1510        }
1511
1512        // BRANCH: link establishment from remote, add one!
1513        if (ld == NULL) {
1514                ld = addDescriptor( id );
1515                logging_info( "onLinkUp (remote request) descriptor: " << ld );
1516
1517                // update descriptor
1518                ld->fromRemote = true;
1519                ld->communicationId = id;
1520                ld->communicationUp = true;
1521                ld->setAutoUsed();
1522                ld->setAlive();
1523
1524                // in this case, do not inform listener, since service it unknown
1525                // -> wait for update message!
1526        }
1527       
1528        // BRANCH: We requested this link in the first place
1529        else
1530        {
1531                logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1532
1533                // update descriptor
1534                ld->setAutoUsed();
1535                ld->setAlive();
1536                ld->communicationUp = true;
1537                ld->fromRemote = false;
1538
1539                // BRANCH: this was a relayed link before --> convert to direct link
1540                //   TODO do we really have to send a message here?
1541                if (ld->relayed)
1542                {
1543                        ld->up = true;
1544                        ld->relayed = false;
1545                        logging_info( "Converting to direct link: " << ld );
1546                       
1547                        // send message
1548                        OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1549                        overMsg.setSourceLink( ld->overlayId );
1550                        overMsg.setDestinationLink( ld->remoteLink );
1551                        send_link( &overMsg, ld->overlayId, system_priority::OVERLAY );
1552                       
1553                    // inform listener
1554                    if( ld->listener != NULL)
1555                        ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1556                }
1557               
1558
1559        /* NOTE: Chord is opening direct-links in it's setup routine which are
1560         *   neither set to "relayed" nor to "up". To activate these links a
1561         *   typeLinkUpdate must be sent.
1562         *   
1563         * This branch is would also be taken when we had a working link before
1564         *   (ld->up == true). I'm not sure if this case does actually happen
1565         *   and whether it's tested.
1566         */
1567                else
1568                {
1569                        // note: necessary to validate the link on the remote side!
1570                        logging_info( "Sending out update" <<
1571                                        " for service " << ld->service.toString() <<
1572                                        " with local node id " << nodeId.toString() <<
1573                                        " on link " << ld->overlayId.toString() );
1574
1575                        // compile and send update message
1576                        OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1577                        overlayMsg.setAutoLink( ld->autolink );
1578                    overlayMsg.setSourceNode(nodeId);
1579                    overlayMsg.setDestinationNode(ld->remoteNode);
1580                    overlayMsg.setSourceLink(ld->overlayId);
1581                    overlayMsg.setDestinationLink(ld->remoteLink);
1582                    overlayMsg.setService(ld->service);
1583                    overlayMsg.setRelayed(false);
1584
1585                    // TODO ld->communicationId = id ??
1586                   
1587                    send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
1588                }
1589        }
1590}
1591
1592void BaseOverlay::onLinkDown(const LinkID& id,
1593        const addressing2::EndpointPtr local,
1594        const addressing2::EndpointPtr remote)
1595{
1596        // erase bootstrap links
1597        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1598        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1599
1600        // get descriptor for link
1601        LinkDescriptor* ld = getDescriptor(id, true);
1602        if ( ld == NULL ) return; // not found? ->ignore!
1603        logging_info( "onLinkDown descriptor: " << ld );
1604
1605        // removing relay link information
1606        removeRelayLink(ld->overlayId);
1607
1608        // inform listeners about link down
1609        ld->communicationUp = false;
1610        if (!ld->service.isUnspecified()) {
1611                CommunicationListener* lst = getListener(ld->service);
1612                if(lst != NULL) lst->onLinkDown( ld->overlayId, ld->remoteNode );
1613                sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1614        }
1615
1616        // delete all queued messages (auto links)
1617        if( ld->messageQueue.size() > 0 ) {
1618                logging_warn( "Dropping link " << id.toString() << " that has "
1619                                << ld->messageQueue.size() << " waiting messages" );
1620                ld->flushQueue();
1621        }
1622
1623        // erase mapping
1624        eraseDescriptor(ld->overlayId);
1625   
1626   
1627    // notify the application if this is the last link to a different node
1628    if ( connected )
1629    {
1630        bool active_links = false;
1631       
1632        // look for links that are still active
1633        foreach( LinkDescriptor* ld, links )
1634        {
1635            if ( isLinkDirectVital(ld) )
1636            {
1637                active_links = true;
1638                break;
1639            }
1640        }
1641
1642        if ( ! active_links )
1643        {
1644            connected = false;
1645           
1646            foreach( NodeListener* i, nodeListeners )
1647                i->onOverlayDisconnected( spovnetId );
1648        }
1649    }
1650
1651}
1652
1653
1654void BaseOverlay::onLinkFail(const LinkID& id,
1655        const addressing2::EndpointPtr local,
1656        const addressing2::EndpointPtr remote)
1657{
1658        logging_debug( "Link fail with base communication link id=" << id );
1659
1660//      // erase bootstrap links
1661//      vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1662//      if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1663//
1664//      // get descriptor for link
1665//      LinkDescriptor* ld = getDescriptor(id, true);
1666//      if ( ld == NULL ) return; // not found? ->ignore!
1667//      logging_debug( "Link failed id=" << ld->overlayId.toString() );
1668//
1669//      // inform listeners
1670//      ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1671//      sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1672       
1673        logging_debug( "  ... calling onLinkDown ..." );
1674        onLinkDown(id, local, remote);
1675}
1676
1677
1678void BaseOverlay::onLinkChanged(const LinkID& id,
1679        const addressing2::EndpointPtr oldlocal,  const addressing2::EndpointPtr newlocal,
1680        const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote)
1681{
1682    // get descriptor for link
1683    LinkDescriptor* ld = getDescriptor(id, true);
1684    if ( ld == NULL ) return; // not found? ->ignore!
1685    logging_debug( "onLinkChanged descriptor: " << ld );
1686
1687    // inform listeners
1688    ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1689    sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1690
1691    // autolinks: refresh timestamp
1692    ld->setAutoUsed();
1693}
1694
1695//void BaseOverlay::onLinkQoSChanged(const LinkID& id,
1696//        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
1697//        const QoSParameterSet& qos)
1698//{
1699//      logging_debug( "Link quality changed with base communication link id=" << id );
1700//
1701//      // get descriptor for link
1702//      LinkDescriptor* ld = getDescriptor(id, true);
1703//      if ( ld == NULL ) return; // not found? ->ignore!
1704//      logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1705//}
1706
1707bool BaseOverlay::onLinkRequest(const LinkID& id,
1708        const addressing2::EndpointPtr local,
1709        const addressing2::EndpointPtr remote)
1710{
1711        logging_debug("Accepting link request from " << remote->to_string() );
1712       
1713        // TODO ask application..?
1714       
1715        return true;
1716}
1717
1718
1719
1720
1721/// handles a message from base communication
1722bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message,
1723                const LinkID& link,
1724                const NodeID&,
1725                bool bypass_overlay )
1726{
1727        // get descriptor for link
1728        LinkDescriptor* ld = getDescriptor( link, true );
1729
1730       
1731        /* choose fastpath for direct links; normal overlay-path otherwise */   
1732        if ( bypass_overlay && ld )
1733        {
1734        // message received --> link is alive
1735        ld->keepAliveReceived = time(NULL);
1736        // hop count on this link
1737        ld->hops = 0;
1738
1739       
1740        // hand over to CommunicationListener (aka Application)
1741            CommunicationListener* lst = getListener(ld->service);
1742            if ( lst != NULL )
1743            {
1744                lst->onMessage(
1745                        message,
1746                        ld->remoteNode,
1747                        ld->overlayId,
1748                    SequenceNumber::DISABLED,
1749                        NULL );
1750               
1751                return true;
1752            }
1753
1754            return false;
1755        }
1756        else
1757        {
1758            return handleMessage( message, ld, link );     
1759        }
1760}
1761
1762// ----------------------------------------------------------------------------
1763
1764/// Handle spovnet instance join requests
1765bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink )
1766{
1767        // decapsulate message
1768        JoinRequest joinReq;
1769        joinReq.deserialize_from_shared_buffer(message);
1770       
1771        logging_info( "Received join request for spovnet " <<
1772                        joinReq.getSpoVNetID().toString() );
1773
1774        // check spovnet id
1775        if( joinReq.getSpoVNetID() != spovnetId ) {
1776                logging_error(
1777                                "Received join request for spovnet we don't handle " <<
1778                                joinReq.getSpoVNetID().toString() );
1779
1780                return false;
1781        }
1782
1783        // TODO: here you can implement mechanisms to deny joining of a node
1784        bool allow = true;
1785        logging_info( "Sending join reply for spovnet " <<
1786                        spovnetId.toString() << " to node " <<
1787                        source.toString() <<
1788                        ". Result: " << (allow ? "allowed" : "denied") );
1789        joiningNodes.push_back( source );
1790
1791        // return overlay parameters
1792        assert( overlayInterface != NULL );
1793        logging_debug( "Using bootstrap end-point "
1794                        << getEndpointDescriptor().toString() )
1795        OverlayParameterSet parameters = overlayInterface->getParameters();
1796       
1797       
1798        // create JoinReplay Message
1799        OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1800                        OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1801        JoinReply replyMsg( spovnetId, parameters, allow );
1802        retmsg.append_buffer(replyMsg.serialize_into_shared_buffer());
1803
1804        // XXX This is unlovely clash between the old message system and the new one,
1805        // but a.t.m. we can't migrate everything to the new system at once..
1806        // ---> Consider the EndpointDescriptor as part of the JoinReply..
1807        retmsg.append_buffer(getEndpointDescriptor().serialize());
1808       
1809        // * send *
1810        send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY);
1811
1812        return true;
1813}
1814
1815/// Handle replies to spovnet instance join requests
1816bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink )
1817{
1818        // decapsulate message
1819        logging_debug("received join reply message");
1820        JoinReply replyMsg;
1821        EndpointDescriptor endpoints;
1822        reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message);
1823        buff = endpoints.deserialize(buff);
1824
1825        // correct spovnet?
1826        if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail
1827                logging_error( "Received SpoVNet join reply for " <<
1828                                replyMsg.getSpoVNetID().toString() <<
1829                                " != " << spovnetId.toString() );
1830
1831                return false;
1832        }
1833
1834        // access granted? no -> fail
1835        if( !replyMsg.getJoinAllowed() ) {
1836                logging_error( "Our join request has been denied" );
1837
1838                // drop initiator link
1839                if( !bcLink.isUnspecified() ){
1840                        bc->dropLink( bcLink );
1841
1842                        vector<LinkID>::iterator it = std::find(
1843                                        bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1844                        if( it != bootstrapLinks.end() )
1845                                bootstrapLinks.erase(it);
1846                }
1847
1848                // inform all registered services of the event
1849                foreach( NodeListener* i, nodeListeners )
1850                i->onJoinFailed( spovnetId );
1851
1852                return true;
1853        }
1854
1855        // access has been granted -> continue!
1856        logging_info("Join request has been accepted for spovnet " <<
1857                        spovnetId.toString() );
1858
1859        logging_debug( "Using bootstrap end-point "
1860                        << endpoints.toString() );
1861
1862        // create overlay structure from spovnet parameter set
1863        // if we have not boostrapped yet against some other node
1864        if( overlayInterface == NULL ){
1865
1866                logging_debug("first-time bootstrapping");
1867
1868                overlayInterface = OverlayFactory::create(
1869                                *this, replyMsg.getParam(), nodeId, this );
1870
1871                // overlay structure supported? no-> fail!
1872                if( overlayInterface == NULL ) {
1873                        logging_error( "overlay structure not supported" );
1874
1875                        if( !bcLink.isUnspecified() ){
1876                                bc->dropLink( bcLink );
1877
1878                                vector<LinkID>::iterator it = std::find(
1879                                                bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1880                                if( it != bootstrapLinks.end() )
1881                                        bootstrapLinks.erase(it);
1882                        }
1883
1884                        // inform all registered services of the event
1885                        foreach( NodeListener* i, nodeListeners )
1886                        i->onJoinFailed( spovnetId );
1887
1888                        return true;
1889                }
1890
1891                // everything ok-> join the overlay!
1892                state = BaseOverlayStateCompleted;
1893                overlayInterface->createOverlay();
1894
1895                overlayInterface->joinOverlay( endpoints );
1896                overlayBootstrap.recordJoin( endpoints );
1897
1898                // update ovlvis
1899                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1900
1901                // inform all registered services of the event
1902                foreach( NodeListener* i, nodeListeners )
1903                    i->onJoinCompleted( spovnetId );
1904        }
1905        else
1906        {
1907                // this is not the first bootstrap, just join the additional node
1908                logging_debug("not first-time bootstrapping");
1909                overlayInterface->joinOverlay( endpoints );
1910                overlayBootstrap.recordJoin( endpoints );
1911        } // if( overlayInterface == NULL )
1912
1913        return true;
1914}
1915
1916
1917bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld )
1918{
1919        // get service
1920        const ServiceID& service = ld->service; //overlayMsg->getService();
1921
1922        logging_debug( "Received data for service " << service.toString()
1923                        << " on link " << overlayMsg->getDestinationLink().toString() );
1924
1925        // delegate data message
1926        CommunicationListener* lst = getListener(service);
1927        if(lst != NULL){
1928                lst->onMessage(
1929                                message,
1930//                              overlayMsg->getSourceNode(),
1931//                              overlayMsg->getDestinationLink(),
1932                                ld->remoteNode,
1933                                ld->overlayId,
1934                overlayMsg->getSeqNum(),
1935                                overlayMsg
1936                );
1937        }
1938
1939        return true;
1940}
1941
1942bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg )
1943{
1944    /**
1945     * Deserialize MessageLost-Message
1946     *
1947     * - Type of lost message
1948     * - Hop count of lost message
1949     * - Source-LinkID  of lost message
1950     */
1951    const uint8_t* buff = message(0, sizeof(uint8_t)*2).data();
1952    uint8_t type = buff[0];
1953    uint8_t hops = buff[1];
1954    LinkID linkid;
1955    linkid.deserialize(message(sizeof(uint8_t)*2));
1956   
1957    logging_warn("Node " << msg->getSourceNode()
1958            << " informed us, that our message of type " << (int) type
1959            << " is lost after traveling " << (int) hops << " hops."
1960            << " (LinkID: " << linkid.toString());
1961
1962   
1963    // TODO switch-case ?
1964   
1965    // BRANCH: LinkRequest --> link request failed
1966    if ( type == OverlayMsg::typeLinkRequest )
1967    {
1968        __onLinkEstablishmentFailed(linkid);
1969    }
1970   
1971    // BRANCH: Data --> link disrupted. Drop link.
1972    //   (We could use something more advanced here. e.g. At least send a
1973    //    keep-alive message and wait for a keep-alive reply.)
1974    if ( type == OverlayMsg::typeData )
1975    {
1976        LinkDescriptor* link_desc = getDescriptor(linkid);
1977       
1978        if ( link_desc )
1979        {
1980            link_desc->failed = true;
1981        }
1982       
1983        dropLink(linkid);
1984    }
1985   
1986    // BRANCH: ping lost
1987    if ( type == OverlayMsg::typePing )
1988    {
1989        CommunicationListener* lst = getListener(msg->getService());
1990        if( lst != NULL )
1991        {
1992            lst->onPingLost(msg->getSourceNode());
1993        }
1994    }
1995   
1996    return true;
1997}
1998
1999bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2000{
2001    // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node)
2002   
2003    bool send_pong = false;
2004   
2005    // inform application and ask permission to send a pong message
2006    CommunicationListener* lst = getListener(overlayMsg->getService());
2007    if( lst != NULL )
2008    {
2009        send_pong = lst->onPing(overlayMsg->getSourceNode());
2010    }
2011   
2012    // send pong message if allowed
2013    if ( send_pong )
2014    {
2015        OverlayMsg pong_msg(OverlayMsg::typePong);
2016        pong_msg.setSeqNum(overlayMsg->getSeqNum());
2017       
2018        // send message
2019        try
2020        {
2021            send_node( &pong_msg, 
2022                overlayMsg->getSourceNode(), 
2023                system_priority::OVERLAY,
2024                overlayMsg->getService() );
2025        }
2026        catch ( message_not_sent& e )
2027        {
2028            logging_info("Could not send Pong-Message to node: " << 
2029                overlayMsg->getSourceNode());
2030        }
2031    }
2032}
2033
2034bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2035{
2036    // inform application
2037    CommunicationListener* lst = getListener(overlayMsg->getService());
2038    if( lst != NULL )
2039    {
2040        lst->onPong(overlayMsg->getSourceNode());
2041    }
2042}
2043
2044bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2045
2046        if( ld == NULL ) {
2047                logging_warn( "received overlay update message for link for "
2048                                << "which we have no mapping" );
2049                return false;
2050        }
2051        logging_info("Received type update message on link " << ld );
2052
2053        // update our link mapping information for this link
2054        bool changed =
2055                        ( ld->remoteNode != overlayMsg->getSourceNode() )
2056                        || ( ld->service != overlayMsg->getService() );
2057
2058        // set parameters
2059        ld->up         = true;
2060        ld->remoteNode = overlayMsg->getSourceNode();
2061        ld->remoteLink = overlayMsg->getSourceLink();
2062        ld->service    = overlayMsg->getService();
2063        ld->autolink   = overlayMsg->isAutoLink();
2064
2065        // if our link information changed, we send out an update, too
2066        if( changed ) {
2067                overlayMsg->swapRoles();
2068                overlayMsg->setSourceNode(nodeId);
2069                overlayMsg->setSourceLink(ld->overlayId);
2070                overlayMsg->setService(ld->service);
2071                send( overlayMsg, ld, system_priority::OVERLAY );
2072        }
2073
2074        // service registered? no-> error!
2075        if( !communicationListeners.contains( ld->service ) ) {
2076                logging_warn( "Link up: event listener has not been registered" );
2077                return false;
2078        }
2079
2080        // default or no service registered?
2081        CommunicationListener* listener = communicationListeners.get( ld->service );
2082        if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
2083                logging_warn("Link up: event listener is default or null!" );
2084                return true;
2085        }
2086
2087        // update descriptor
2088        ld->listener = listener;
2089        ld->setAutoUsed();
2090        ld->setAlive();
2091
2092        // ask the service whether it wants to accept this link
2093        if( !listener->onLinkRequest(ld->remoteNode) ) {
2094
2095                logging_debug("Link id=" << ld->overlayId.toString() <<
2096                                " has been denied by service " << ld->service.toString() << ", dropping link");
2097
2098                // prevent onLinkDown calls to the service
2099                ld->listener = &CommunicationListener::DEFAULT;
2100
2101                // drop the link
2102                dropLink( ld->overlayId );
2103                return true;
2104        }
2105
2106        // set link up
2107        ld->up = true;
2108        logging_info( "Link has been accepted by service and is up: " << ld );
2109
2110        // auto links: link has been accepted -> send queued messages
2111        if( ld->messageQueue.size() > 0 ) {
2112                logging_info( "Sending out queued messages on link " << ld );
2113        foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue )
2114        {
2115            sendMessage( msg.message, ld->overlayId, msg.priority );
2116        }
2117                ld->messageQueue.clear();
2118        }
2119
2120        // call the notification functions
2121        listener->onLinkUp( ld->overlayId, ld->remoteNode );
2122        sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
2123
2124   
2125    // notify the application if this is the first link to a different node
2126    if ( not connected )
2127    {
2128        connected = true;
2129       
2130        foreach( NodeListener* i, nodeListeners )
2131            i->onOverlayConnected( spovnetId );
2132    }
2133   
2134        return true;
2135}
2136
2137/// handle a link request and reply
2138bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2139
2140        //TODO: Check if a request has already been sent using getSourceLink() ...
2141
2142        // create link descriptor
2143        LinkDescriptor* ldn = addDescriptor();
2144
2145        // flags
2146        ldn->up = true;
2147        ldn->fromRemote = true;
2148        ldn->relayed = true;
2149
2150        // parameters
2151        ldn->service = overlayMsg->getService();
2152        ldn->listener = getListener(ldn->service);
2153        ldn->remoteNode = overlayMsg->getSourceNode();
2154        ldn->remoteLink = overlayMsg->getSourceLink();
2155        ldn->hops = overlayMsg->getNumHops();
2156       
2157    // initialize sequence numbers
2158    ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
2159    logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum);
2160   
2161   
2162        // update time-stamps
2163        ldn->setAlive();
2164        ldn->setAutoUsed();
2165
2166        logging_info( "Link request received from node id="
2167                << overlayMsg->getSourceNode()
2168                << " LINK: "
2169                << ldn);
2170       
2171        // create reply message and send back!
2172        overlayMsg->swapRoles(); // swap source/destination
2173        overlayMsg->setType(OverlayMsg::typeLinkReply);
2174        overlayMsg->setSourceLink(ldn->overlayId);
2175        overlayMsg->setRelayed(true);
2176//      overlayMsg->setRouteRecord(true);
2177    overlayMsg->setSeqNum(ld->last_sent_seqnum);
2178       
2179        // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!!
2180        // append our endpoints (for creation of a direct link)
2181        overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize());
2182       
2183        send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link
2184
2185        // inform listener
2186        if(ldn != NULL && ldn->listener != NULL)
2187                ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
2188
2189        return true;
2190}
2191
2192bool BaseOverlay::handleLinkReply(
2193        OverlayMsg* overlayMsg,
2194        reboost::shared_buffer_t sub_message,
2195        LinkDescriptor* ld )
2196{
2197    // deserialize EndpointDescriptor
2198    EndpointDescriptor endpoints;
2199    endpoints.deserialize(sub_message);
2200   
2201        // find link request
2202        LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
2203
2204        // not found? yes-> drop with error!
2205        if (ldn == NULL) {
2206                logging_error( "No link request pending for "
2207                                << overlayMsg->getDestinationLink().toString() );
2208                return false;
2209        }
2210        logging_debug("Handling link reply for " << ldn )
2211
2212        // check if already up
2213        if (ldn->up) {
2214                logging_warn( "Link already up: " << ldn );
2215                return true;
2216        }
2217
2218        // debug message
2219        logging_info( "Link request reply received. Establishing link"
2220                        << " for service " << overlayMsg->getService().toString()
2221                        << " with local id=" << overlayMsg->getDestinationLink()
2222                        << " and remote link id=" << overlayMsg->getSourceLink()
2223                        << " to " << endpoints.toString()
2224                        << " hop count: " << overlayMsg->getRouteRecord().size()
2225        );
2226
2227        // set local link descriptor data
2228        ldn->up = true;
2229        ldn->relayed = true;
2230        ldn->service = overlayMsg->getService();
2231        ldn->listener = getListener(ldn->service);
2232        ldn->remoteLink = overlayMsg->getSourceLink();
2233        ldn->remoteNode = overlayMsg->getSourceNode();
2234
2235        // update timestamps
2236        ldn->setAlive();
2237        ldn->setAutoUsed();
2238
2239        // auto links: link has been accepted -> send queued messages
2240        if( ldn->messageQueue.size() > 0 ) {
2241                logging_info( "Sending out queued messages on link " <<
2242                                ldn->overlayId.toString() );
2243                foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue )
2244                {
2245                        sendMessage( msg.message, ldn->overlayId, msg.priority );
2246                }
2247                ldn->messageQueue.clear();
2248        }
2249
2250        // inform listeners about new link
2251        ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
2252
2253        // try to replace relay link with direct link
2254        ldn->retryCounter = 3;
2255        ldn->endpoint = endpoints;
2256        ldn->communicationId =  bc->establishLink( ldn->endpoint );
2257
2258        return true;
2259}
2260
2261/// handle a keep-alive message for a link
2262bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2263{
2264        LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
2265       
2266        if ( rld != NULL )
2267        {
2268                logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() );
2269                if (overlayMsg->isRouteRecord())
2270                {
2271                        rld->routeRecord = overlayMsg->getRouteRecord();
2272                }
2273               
2274                // set alive
2275                rld->setAlive();
2276               
2277               
2278                /* answer keep alive */
2279                if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive )
2280                {
2281            time_t now = time(NULL);
2282            logging_debug("[BaseOverlay] Answering KeepAlive over "
2283                    << ld->to_string()
2284                    << " after "
2285                    << difftime( now, ld->keepAliveSent )
2286                    << "s");
2287           
2288            OverlayMsg msg( OverlayMsg::typeKeepAliveReply,
2289                    OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
2290            msg.setRouteRecord(true);
2291            ld->keepAliveSent = now;
2292            send_link( &msg, ld->overlayId, system_priority::OVERLAY );
2293                }
2294
2295                return true;
2296        }
2297        else
2298        {
2299                logging_error("No Keep-Alive for "
2300                                << overlayMsg->getDestinationLink() << ": link unknown." );
2301                return false;
2302        }
2303}
2304
2305/// handle a direct link message
2306bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2307        logging_debug( "Received direct link replacement request" );
2308
2309        /// get destination overlay link
2310        LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
2311        if (rld == NULL || ld == NULL) {
2312                logging_error("Direct link replacement: Link "
2313                                << overlayMsg->getDestinationLink() << "not found error." );
2314                return false;
2315        }
2316        logging_info( "Received direct link convert notification for " << rld );
2317
2318        // update information
2319        rld->communicationId = ld->communicationId;
2320        rld->communicationUp = true;
2321        rld->relayed = false;
2322
2323        // mark used and alive!
2324        rld->setAlive();
2325        rld->setAutoUsed();
2326
2327        // erase the original descriptor
2328        eraseDescriptor(ld->overlayId);
2329       
2330    // inform listener
2331    if( rld->listener != NULL)
2332        rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode );
2333       
2334        return true;
2335}
2336
2337/// handles an incoming message
2338bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld,
2339                const LinkID bcLink )
2340{
2341        // decapsulate overlay message
2342        OverlayMsg* overlayMsg = new OverlayMsg();
2343        reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message);
2344
2345//      // XXX debug
2346//      logging_info( "Received overlay message."
2347//              << " Hops: " << (int) overlayMsg->getNumHops()
2348//              << " Type: " << (int) overlayMsg->getType()
2349//              << " Payload size: " << sub_buff.size()
2350//             << " SeqNum: " << overlayMsg->getSeqNum() );
2351       
2352       
2353        // increase number of hops
2354        overlayMsg->increaseNumHops();
2355
2356        // refresh relay information
2357        refreshRelayInformation( overlayMsg, ld );
2358
2359        // update route record
2360        overlayMsg->addRouteRecord(nodeId);
2361
2362        // handle signaling messages (do not route!)
2363        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
2364                        overlayMsg->getType()<=OverlayMsg::typeSignalingEnd )
2365        {
2366                overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
2367                delete overlayMsg;
2368                return true;
2369        }
2370
2371        // message for reached destination? no-> route message
2372        if (!overlayMsg->getDestinationNode().isUnspecified() &&
2373                        overlayMsg->getDestinationNode() != nodeId ) {
2374                logging_debug("Routing message "
2375                                << " from " << overlayMsg->getSourceNode()
2376                                << " to " << overlayMsg->getDestinationNode()
2377                );
2378               
2379//              // XXX testing AKTUELL
2380//        logging_info("MARIO: Routing message "
2381//                << " from " << overlayMsg->getSourceNode()
2382//                << " to " << overlayMsg->getDestinationNode() );
2383//        logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size());
2384                overlayMsg->append_buffer(sub_buff);
2385               
2386                route( overlayMsg, ld->remoteNode );
2387                delete overlayMsg;
2388                return true;
2389        }
2390
2391       
2392        /* handle base overlay message */
2393        bool ret = false; // return value
2394        try
2395        {
2396        switch ( overlayMsg->getType() ) 
2397        {
2398            // data transport messages
2399            case OverlayMsg::typeData:
2400            {
2401                // NOTE: On relayed links, »ld« does not point to our link, but on the relay link.
2402                LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink());
2403               
2404                if ( ! end_to_end_ld )
2405                {
2406                    logging_warn("Error: Data-Message claims to belong to a link we don't know.");
2407                   
2408                    ret = false;
2409                }
2410                else
2411                {
2412                    // message received --> link is alive
2413                    end_to_end_ld->keepAliveReceived = time(NULL);
2414                    // hop count on this link
2415                    end_to_end_ld->hops = overlayMsg->getNumHops();
2416                   
2417                    // * call handler *
2418                    ret = handleData(sub_buff, overlayMsg, end_to_end_ld);
2419                }
2420               
2421                break;
2422            }
2423            case OverlayMsg::typeMessageLost:
2424                ret = handleLostMessage(sub_buff, overlayMsg);
2425               
2426                break;
2427       
2428                // overlay setup messages
2429            case OverlayMsg::typeJoinRequest:
2430                ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink );        break;
2431            case OverlayMsg::typeJoinReply:
2432                ret = handleJoinReply(sub_buff, bcLink );       break;
2433       
2434                // link specific messages
2435            case OverlayMsg::typeLinkRequest:
2436                ret = handleLinkRequest(overlayMsg, ld );       break;
2437            case OverlayMsg::typeLinkReply:
2438                ret = handleLinkReply(overlayMsg, sub_buff, ld );       break;
2439            case OverlayMsg::typeLinkUpdate:
2440                ret = handleLinkUpdate(overlayMsg, ld );        break;
2441            case OverlayMsg::typeKeepAlive:
2442            case OverlayMsg::typeKeepAliveReply:
2443                ret = handleLinkAlive(overlayMsg, ld );         break;
2444            case OverlayMsg::typeLinkDirect:
2445                ret = handleLinkDirect(overlayMsg, ld );        break;
2446               
2447            case OverlayMsg::typeLinkClose:
2448            {
2449                dropLink(overlayMsg->getDestinationLink());
2450                __removeDroppedLink(overlayMsg->getDestinationLink());
2451               
2452                break;
2453            }
2454           
2455            /// ping over overlay path (or similar)
2456            case OverlayMsg::typePing:
2457            {
2458                ret = handlePing(overlayMsg, ld);
2459                break;
2460            }
2461            case OverlayMsg::typePong:
2462            {
2463                ret = handlePong(overlayMsg, ld);
2464                break;
2465            }
2466           
2467                // handle unknown message type
2468            default:
2469            {
2470                logging_error( "received message in invalid state! don't know " <<
2471                        "what to do with this message of type " << overlayMsg->getType() );
2472                ret = false;
2473                break;
2474            }
2475        }
2476        }
2477        catch ( reboost::illegal_sub_buffer& e )
2478        {
2479            logging_error( "Failed to create sub-buffer while reading message: »"
2480                    << e.what()
2481                    << "« Message too short? ");
2482           
2483            assert(false); // XXX
2484        }
2485
2486        // free overlay message and return value
2487        delete overlayMsg;
2488        return ret;
2489}
2490
2491// ----------------------------------------------------------------------------
2492
2493void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) {
2494
2495        logging_debug( "broadcasting message to all known nodes " <<
2496                        "in the overlay from service " + service.toString() );
2497
2498        OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
2499        for(size_t i=0; i<nodes.size(); i++){
2500                NodeID& id = nodes.at(i);
2501                if(id == this->nodeId) continue; // don't send to ourselfs
2502
2503                sendMessage( message, id, priority, service );
2504        }
2505}
2506
2507/// return the overlay neighbors
2508vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
2509        // the known nodes _can_ also include our node, so we remove ourself
2510        vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
2511        vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
2512        if( i != nodes.end() ) nodes.erase( i );
2513        return nodes;
2514}
2515
2516const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
2517        if( lid == LinkID::UNSPECIFIED ) return nodeId;
2518        const LinkDescriptor* ld = getDescriptor(lid);
2519        if( ld == NULL ) return NodeID::UNSPECIFIED;
2520        else return ld->remoteNode;
2521}
2522
2523vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
2524        vector<LinkID> linkvector;
2525        foreach( LinkDescriptor* ld, links ) {
2526                if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
2527                        linkvector.push_back( ld->overlayId );
2528                }
2529        }
2530        return linkvector;
2531}
2532
2533
2534void BaseOverlay::onNodeJoin(const NodeID& node) {
2535        JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
2536        if( i == joiningNodes.end() ) return;
2537
2538        logging_info( "node has successfully joined baseoverlay and overlay structure "
2539                        << node.toString() );
2540
2541        joiningNodes.erase( i );
2542}
2543
2544void BaseOverlay::eventFunction() {
2545        stabilizeRelays();
2546        stabilizeLinks();
2547        updateVisual();
2548}
2549
2550
2551
2552/* link status */
2553bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const
2554{
2555    const LinkDescriptor* ld = getDescriptor(lnk);
2556   
2557    if (!ld)
2558        return false;
2559   
2560    return ld->communicationUp && !ld->relayed;
2561}
2562
2563int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const
2564{
2565    const LinkDescriptor* ld = getDescriptor(lnk);
2566   
2567    if (!ld)
2568        return -1;
2569   
2570    return ld->hops;   
2571}
2572
2573
2574bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const
2575{
2576    time_t now = time(NULL);
2577
2578    return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..?
2579}
2580
2581bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const
2582{
2583    return isLinkVital(link) && link->communicationUp && !link->relayed;
2584}
2585
2586/* [link status] */
2587
2588
2589void BaseOverlay::updateVisual(){
2590
2591        //
2592        // update base overlay structure
2593        //
2594
2595        static NodeID pre = NodeID::UNSPECIFIED;
2596        static NodeID suc = NodeID::UNSPECIFIED;
2597
2598        vector<NodeID> nodes = this->getOverlayNeighbors(false);
2599
2600        if(nodes.size() == 0){
2601
2602                if(pre != NodeID::UNSPECIFIED){
2603                        visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
2604                        pre = NodeID::UNSPECIFIED;
2605                }
2606                if(suc != NodeID::UNSPECIFIED){
2607                        visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
2608                        suc = NodeID::UNSPECIFIED;
2609                }
2610
2611        } // if(nodes.size() == 0)
2612
2613        if(nodes.size() == 1){
2614                // only one node, make this pre and succ
2615                // and then go into the node.size()==2 case
2616                //nodes.push_back(nodes.at(0));
2617
2618                if(pre != nodes.at(0)){
2619                        pre = nodes.at(0);
2620                        if(pre != NodeID::UNSPECIFIED)
2621                                visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
2622                }
2623        }
2624
2625        if(nodes.size() == 2){
2626
2627                // old finger
2628                if(nodes.at(0) != pre){
2629                        if(pre != NodeID::UNSPECIFIED)
2630                                visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
2631                        pre = NodeID::UNSPECIFIED;
2632                }
2633                if(nodes.at(1) != suc){
2634                        if(suc != NodeID::UNSPECIFIED)
2635                                visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
2636                        suc = NodeID::UNSPECIFIED;
2637                }
2638
2639                // connect with fingers
2640                if(pre == NodeID::UNSPECIFIED){
2641                        pre = nodes.at(0);
2642                        if(pre != NodeID::UNSPECIFIED)
2643                                visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
2644                }
2645                if(suc == NodeID::UNSPECIFIED){
2646                        suc = nodes.at(1);
2647                        if(suc != NodeID::UNSPECIFIED)
2648                                visualInstance.visConnect(visualIdOverlay, this->nodeId, suc, "");
2649                }
2650
2651        } //if(nodes.size() == 2)
2652
2653//      {
2654//              logging_error("================================");
2655//              logging_error("my nodeid " << nodeId.get(MAX_KEYLENGTH-16, 16));
2656//              logging_error("================================");
2657//              if(nodes.size()>= 1){
2658//                      logging_error("real pre " << nodes.at(0).toString());
2659//                      logging_error("real pre " << nodes.at(0).get(MAX_KEYLENGTH-16, 16));
2660//              }
2661//              if(nodes.size()>= 2){
2662//                      logging_error("real suc " << nodes.at(1).toString());
2663//                      logging_error("real suc " << nodes.at(1).get(MAX_KEYLENGTH-16, 16));
2664//              }
2665//              logging_error("================================");
2666//              if(pre == NodeID::UNSPECIFIED){
2667//                      logging_error("pre: unspecified");
2668//              }else{
2669//                      unsigned int prei = pre.get(MAX_KEYLENGTH-16, 16);
2670//                      logging_error("pre: " << prei);
2671//              }
2672//              if(suc == NodeID::UNSPECIFIED){
2673//                      logging_error("suc: unspecified");
2674//              }else{
2675//                      unsigned int suci = suc.get(MAX_KEYLENGTH-16, 16);
2676//                      logging_error("suc: " << suci);
2677//              }
2678//              logging_error("================================");
2679//      }
2680
2681        //
2682        // update base communication links
2683        //
2684
2685        static set<NodeID> linkset;
2686        set<NodeID> remotenodes;
2687        foreach( LinkDescriptor* ld, links ) {
2688                if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
2689                        continue;
2690
2691                if (ld->routeRecord.size()>1 && ld->relayed) {
2692                        for (size_t i=1; i<ld->routeRecord.size(); i++)
2693                                remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] );
2694                } else {
2695                        remotenodes.insert(ld->remoteNode);
2696                }
2697        }
2698
2699        // which links are old and need deletion?
2700        bool changed = false;
2701
2702        do{
2703                changed = false;
2704                foreach(NodeID n, linkset){
2705                        if(remotenodes.find(n) == remotenodes.end()){
2706                                visualInstance.visDisconnect(visualIdBase, this->nodeId, n, "");
2707                                linkset.erase(n);
2708                                changed = true;
2709                                break;
2710                        }
2711                }
2712        }while(changed);
2713
2714        // which links are new and need creation?
2715        do{
2716                changed = false;
2717                foreach(NodeID n, remotenodes){
2718                        if(linkset.find(n) == linkset.end()){
2719                                visualInstance.visConnect(visualIdBase, this->nodeId, n, "");
2720                                linkset.insert(n);
2721                                changed = true;
2722                                break;
2723                        }
2724                }
2725        }while(changed);
2726
2727}
2728
2729// ----------------------------------------------------------------------------
2730
2731std::string BaseOverlay::debugInformation() {
2732        std::stringstream s;
2733        int i=0;
2734
2735        // dump overlay information
2736        s << "Long debug info ... [see below]" << endl << endl;
2737        s << "--- overlay information ----------------------" << endl;
2738        s << overlayInterface->debugInformation() << endl;
2739
2740        // dump link state
2741        s << "--- link state -------------------------------" << endl;
2742        foreach( LinkDescriptor* ld, links ) {
2743                s << "link " << i << ": " << ld << endl;
2744                i++;
2745        }
2746        s << endl << endl;
2747
2748        return s.str();
2749}
2750
2751}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.