An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/overlay/BaseOverlay.cpp @ 12060

Last change on this file since 12060 was 12060, checked in by hock@…, 9 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue? can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery? discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg?)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers? (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 78.8 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/visual/OvlVis.h"
57#include "ariba/utility/visual/DddVis.h"
58#include "ariba/utility/visual/ServerVis.h"
59#include <ariba/utility/misc/sha1.h>
60
61namespace ariba {
62namespace overlay {
63
64using namespace std;
65using ariba::transport::system_priority;
66
67#define visualInstance          ariba::utility::DddVis::instance()
68#define visualIdOverlay         ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
69#define visualIdBase            ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
70
71
72// time constants (in seconds)
73#define KEEP_ALIVE_TIME         60                      // send keep-alive message after link is not used for #s 
74
75#define LINK_ESTABLISH_TIME_OUT 10                      // timeout: link requested but not up
76#define KEEP_ALIVE_TIME_OUT     KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT     // timeout: no data received on this link (incl. keep-alive messages)
77#define AUTO_LINK_TIME_OUT      KEEP_ALIVE_TIME_OUT                    // timeout: auto link not used for #s
78
79
80// ----------------------------------------------------------------------------
81
82/* *****************************************************************************
83 * PREREQUESITES
84 * ****************************************************************************/
85
86CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
87        if( !communicationListeners.contains( service ) ) {
88                logging_info( "No listener found for service " << service.toString() );
89                return NULL;
90        }
91        CommunicationListener* listener = communicationListeners.get( service );
92        assert( listener != NULL );
93        return listener;
94}
95
96// link descriptor handling ----------------------------------------------------
97
98LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
99        foreach( LinkDescriptor* lp, links )
100                                if ((communication ? lp->communicationId : lp->overlayId) == link)
101                                        return lp;
102        return NULL;
103}
104
105const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
106        foreach( const LinkDescriptor* lp, links )
107                                if ((communication ? lp->communicationId : lp->overlayId) == link)
108                                        return lp;
109        return NULL;
110}
111
112/// erases a link descriptor
113void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
114        for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
115                LinkDescriptor* ld = *i;
116                if ((communication ? ld->communicationId : ld->overlayId) == link) {
117                        delete ld;
118                        links.erase(i);
119                        break;
120                }
121        }
122}
123
124/// adds a link descriptor
125LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
126        LinkDescriptor* desc = getDescriptor( link );
127        if ( desc == NULL ) {
128                desc = new LinkDescriptor();
129                if (!link.isUnspecified()) desc->overlayId = link;
130                links.push_back(desc);
131        }
132        return desc;
133}
134
135/// returns a auto-link descriptor
136LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service )
137{
138        // search for a descriptor that is already up
139        foreach( LinkDescriptor* lp, links )
140    {
141        if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) )
142            return lp;
143    }
144       
145        // if not found, search for one that is about to come up...
146        foreach( LinkDescriptor* lp, links )
147        {
148            time_t now = time(NULL);
149           
150        if (lp->autolink && lp->remoteNode == node && lp->service == service
151                && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT )
152            return lp;
153        }
154       
155        return NULL;
156}
157
158/// stabilizes link information
159void BaseOverlay::stabilizeLinks() {
160    time_t now = time(NULL);
161   
162    // send keep-alive messages over established links
163        foreach( LinkDescriptor* ld, links )
164    {
165                if (!ld->up) continue;
166               
167                if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME )
168                {
169                    logging_debug("[BaseOverlay] Sending KeepAlive over "
170                            << ld->to_string()
171                            << " after "
172                            << difftime( now, ld->keepAliveSent )
173                            << "s");
174                   
175            OverlayMsg msg( OverlayMsg::typeKeepAlive,
176                    OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
177            msg.setRouteRecord(true);
178            ld->keepAliveSent = now;
179            send_link( &msg, ld->overlayId, system_priority::OVERLAY );
180                }
181        }
182
183        // iterate over all links and check for time boundaries
184        vector<LinkDescriptor*> oldlinks;
185        foreach( LinkDescriptor* ld, links ) {
186
187                // link connection request stale?
188                if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT )  // NOTE: keepAliveReceived == now, on connection request
189                {
190            logging_info( "Link connection request is stale, closing: " << ld );
191            ld->failed = true;
192            oldlinks.push_back( ld );
193            continue;
194                }
195
196                if (!ld->up) continue;
197
198               
199               
200               
201                // check if link is relayed and retry connecting directly
202                // TODO Mario: What happens here?  --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply
203                if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) {
204                        ld->retryCounter--;
205                        ld->communicationId = bc->establishLink( ld->endpoint );
206                }
207
208                // remote used as relay flag
209                if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT)  // TODO is this a reasonable timeout ??
210                        ld->relaying = false;
211
212                // drop links that are dropped and not used as relay
213                if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
214                        oldlinks.push_back( ld );
215                        continue;
216                }
217
218                // auto-link time exceeded?
219                if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) {
220                        oldlinks.push_back( ld );
221                        continue;
222                }
223
224                // keep alives missed? yes->
225                if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT )
226                {
227            logging_info( "Link is stale, closing: " << ld );
228            ld->failed = true;
229            oldlinks.push_back( ld );
230            continue;
231                }
232        }
233
234        // drop links
235        foreach( LinkDescriptor* ld, oldlinks ) {
236                logging_info( "Link timed out. Dropping " << ld );
237                ld->relaying = false;
238                dropLink( ld->overlayId );
239        }
240
241       
242       
243       
244        // show link state  (debug output)
245        if (counter>=10 || counter<0)
246    {
247            showLinks();
248            counter = 0;
249    }
250        else
251        {
252            counter++;
253        }
254}
255
256
257std::string BaseOverlay::getLinkHTMLInfo() {
258        std::ostringstream s;
259        vector<NodeID> nodes;
260        if (links.size()==0) {
261                s << "<h2 style=\"color=#606060\">No links established!</h2>";
262        } else {
263                s << "<h2 style=\"color=#606060\">Links</h2>";
264                s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
265                s << "<tr style=\"background-color=#ffe0e0\">";
266                s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
267                s << "</tr>";
268
269                int i=0;
270                foreach( LinkDescriptor* ld, links ) {
271                        if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
272                        bool found = false;
273                        foreach(NodeID& id, nodes)
274                        if (id  == ld->remoteNode) found = true;
275                        if (found) continue;
276                        i++;
277                        nodes.push_back(ld->remoteNode);
278                        if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
279                        else s << "<tr>";
280                        s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
281                        s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
282                        s << "<td>";
283                        if (ld->routeRecord.size()>1 && ld->relayed) {
284                                for (size_t i=1; i<ld->routeRecord.size(); i++)
285                                        s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
286                        } else {
287                                s << "Direct";
288                        }
289                        s << "</td>";
290                        s << "</tr>";
291                }
292                s << "</table>";
293        }
294        return s.str();
295}
296
297/// shows the current link state
298void BaseOverlay::showLinks() {
299        int i=0;
300        logging_info("--- link state -------------------------------");
301        foreach( LinkDescriptor* ld, links ) {
302                string epd = "";
303                if (isLinkDirectVital(ld))
304                {
305//                      epd = getEndpointDescriptor(ld->remoteNode).toString();
306                   
307                    epd = "Connection: ";
308                    epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string();
309                    epd += " <---> ";
310                    epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string();
311                }
312
313                logging_info("LINK_STATE: " << i << ": " << ld << " " << epd);
314                i++;
315        }
316        logging_info("----------------------------------------------");
317}
318
319/// compares two arbitrary links to the same node
320int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
321        LinkDescriptor* lhsld = getDescriptor(lhs);
322        LinkDescriptor* rhsld = getDescriptor(rhs);
323        if (lhsld==NULL || rhsld==NULL
324                        || !lhsld->up || !rhsld->up
325                        || lhsld->remoteNode != rhsld->remoteNode) return -1;
326
327        if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId)  )
328                return -1;
329
330        return 1;
331}
332
333
334// internal message delivery ---------------------------------------------------
335
336
337seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority )
338{
339    // set priority
340    message->setPriority(priority);
341   
342    // wrap old OverlayMsg into reboost message
343    reboost::message_t wrapped_message = message->wrap_up_for_sending();
344   
345    // send down to BaseCommunication
346    try
347    {
348        // * send *
349        return bc->sendMessage(bc_link, wrapped_message, priority, false);
350    }
351    catch ( communication::communication_message_not_sent& e )
352    {
353        ostringstream out;
354        out << "Communication message not sent: " << e.what();
355        throw message_not_sent(out.str());
356    }
357   
358    throw logic_error("This should never happen!");
359}
360
361
362/// routes a message to its destination node
363void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop )
364{
365        // exceeded time-to-live? yes-> drop message
366        if (message->getNumHops() > message->getTimeToLive())
367        {
368                logging_warn("Message exceeded TTL. Dropping message and relay routes "
369            << "for recovery. Hop count: " << (int) message->getNumHops());
370                removeRelayNode(message->getDestinationNode());
371                return;
372        }
373
374        // no-> forward message
375        else
376        {
377                // destinastion myself? yes-> handle message
378                if (message->getDestinationNode() == nodeId)
379                {
380                        logging_warn("Usually I should not route messages to myself. And I won't!");
381                }
382
383                // no->send message to next hop
384                else
385                {
386                    try
387                    {
388                /* (deep) packet inspection to determine priority */
389                // BRANCH: typeData  -->  send with low priority
390                if ( message->getType() == OverlayMsg::typeData )
391                {
392                    // TODO think about implementing explicit routing queue (with active queue management??)
393                    send( message,
394                          message->getDestinationNode(),
395                          message->getPriority(),
396                          last_hop );
397                }
398                // BRANCH: internal message  -->  send with higher priority
399                else
400                {
401                    send( message,
402                          message->getDestinationNode(),
403                          system_priority::HIGH,
404                          last_hop );
405                }
406                    }
407                    catch ( message_not_sent& e )
408                    {
409                        logging_warn("Unable to route message of type "
410                                << message->getType()
411                                << " to "
412                                << message->getDestinationNode()
413                                << ". Reason: "
414                                << e.what());
415                       
416                        // inform sender
417                if ( message->getType() != OverlayMsg::typeMessageLost )
418                {
419                    report_lost_message(message);
420                }
421                    }
422                }
423        }
424}
425
426void BaseOverlay::report_lost_message( const OverlayMsg* message )
427{
428    OverlayMsg reply(OverlayMsg::typeMessageLost);
429    reply.setSeqNum(message->getSeqNum());
430   
431    /**
432     * MessageLost-Message
433     *
434     * - Type of lost message
435     * - Hop count of lost message
436     * - Source-LinkID  of lost message
437     */
438    reboost::shared_buffer_t b(sizeof(uint8_t)*2);
439    b.mutable_data()[0] = message->getType();
440    b.mutable_data()[1] = message->getNumHops();
441    reply.append_buffer(b);
442    reply.append_buffer(message->getSourceLink().serialize());
443   
444    try
445    {
446        send_node(&reply, message->getSourceNode(),
447                system_priority::OVERLAY,
448                OverlayInterface::OVERLAY_SERVICE_ID);
449    }
450    catch ( message_not_sent& e )
451    {
452        logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too.");
453    }
454}
455
456/// sends a message to another node, delivers it to the base overlay class
457seqnum_t BaseOverlay::send( OverlayMsg* message,
458        const NodeID& destination,
459        uint8_t priority,
460        const NodeID& last_hop ) throw(message_not_sent)
461{
462        LinkDescriptor* next_link = NULL;
463
464        // drop messages to unspecified destinations
465        if (destination.isUnspecified())
466            throw message_not_sent("No destination specified. Drop!");
467
468        // send messages to myself -> drop!
469    // TODO maybe this is not what we want. why not just deliver this message?
470    //   There is a similar check in the route function, there it should be okay.
471        if (destination == nodeId)
472        {
473            logging_warn("Sent message to myself. Drop!");
474           
475            throw message_not_sent("Sent message to myself. Drop!");
476        }
477
478        // use relay path?
479        if (message->isRelayed())
480        {
481                next_link = getRelayLinkTo( destination );
482               
483                if (next_link != NULL)
484                {
485                        next_link->setRelaying();
486
487                        // * send message over relayed link *
488                        return send_overlaymessage_down(message, next_link->communicationId, priority);
489                }
490                else
491                {
492                        logging_warn("No relay hop found to " << destination
493                                << " -- trying to route over overlay paths ...")
494                }
495        }
496
497       
498        // last resort -> route over overlay path
499        LinkID next_id = overlayInterface->getNextLinkId( destination );
500        if ( next_id.isUnspecified() )
501        {               
502        // apperently we're the closest node --> try second best node
503        //   NOTE: This is helpful if our routing table is not up-to-date, but
504        //   may lead to circles. So we have to be careful.
505        std::vector<const LinkID*> next_ids = 
506            overlayInterface->getSortedLinkIdsTowardsNode( destination );
507           
508        for ( int i = 0; i < next_ids.size(); i++ )
509        {
510            const LinkID& link = *next_ids[i];
511           
512            if ( ! link.isUnspecified() )
513            {
514                next_id = link;
515               
516                break;
517            }
518        }
519     
520        // still no next hop found. drop.
521        if ( next_id.isUnspecified() )
522        {
523            logging_warn("Could not send message. No next hop found to " <<
524                destination );
525            logging_error("ERROR: " << debugInformation() );
526           
527            throw message_not_sent("No next hop found.");
528        }
529        }
530
531       
532        /* get link descriptor, do some checks and send message */
533        next_link = getDescriptor(next_id);
534   
535    // check pointer
536    if ( next_link == NULL )
537    {
538        // NOTE: this shuldn't happen
539        throw message_not_sent("Could not send message. Link not known.");
540    }
541   
542    // avoid circles
543    if ( next_link->remoteNode == last_hop )
544    {
545        // XXX logging_debug
546        logging_info("Possible next hop would create a circle: "
547            << next_link->remoteNode);
548       
549        throw message_not_sent("Could not send message. Possible next hop would create a circle.");
550    }
551   
552    // check if link is up
553        if ( ! next_link->up)
554        {
555        logging_warn("Could not send message. Link not up");
556        logging_error("ERROR: " << debugInformation() );
557       
558        throw message_not_sent("Could not send message. Link not up");
559        }
560
561        // * send message over overlay link *
562        return send(message, next_link, priority);
563}
564
565
566/// send a message using a link descriptor, delivers it to the base overlay class
567seqnum_t BaseOverlay::send( OverlayMsg* message,
568        LinkDescriptor* ldr,
569        uint8_t priority ) throw(message_not_sent)
570{
571        // check if null
572        if (ldr == NULL)
573        {
574        ostringstream out;
575        out << "Can not send message to " << message->getDestinationAddress();
576        throw message_not_sent(out.str());
577        }
578
579        // check if up
580        if ( !ldr->up )
581        {
582                logging_error("DEBUG_INFO: " << debugInformation() );
583
584        ostringstream out;
585        out << "Can not send message. Link not up:" << ldr->to_string();
586        throw message_not_sent(out.str());
587        }
588       
589        LinkDescriptor* next_hop_ld = NULL;
590
591        // BRANCH: relayed link
592        if (ldr->relayed)
593        {
594                logging_debug("Resolving direct link for relayed link to "
595                                << ldr->remoteNode);
596               
597                next_hop_ld = getRelayLinkTo( ldr->remoteNode );
598               
599                if (next_hop_ld==NULL)
600                {
601                        logging_error("DEBUG_INFO: " << debugInformation() );
602                       
603                ostringstream out;
604                out << "No relay path found to link: " << ldr;
605                throw message_not_sent(out.str());
606                }
607               
608                next_hop_ld->setRelaying();
609                message->setRelayed(true);
610        }
611        // BRANCH: direct link
612        else
613        {
614                next_hop_ld = ldr;
615        }
616
617       
618        // check next hop-link
619        if ( ! next_hop_ld->communicationUp)
620        {
621            throw message_not_sent( "send(): Could not send message."
622                    " Not a relayed link and direct link is not up." );
623        }
624
625        // send over next link
626    logging_debug("send(): Sending message over direct link.");
627    return send_overlaymessage_down(message, next_hop_ld->communicationId, priority);
628
629}
630
631seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
632        uint8_t priority, const ServiceID& service) throw(message_not_sent)
633{
634        message->setSourceNode(nodeId);
635        message->setDestinationNode(remote);
636        message->setService(service);
637        return send( message, remote, priority );
638}
639
640void BaseOverlay::send_link( OverlayMsg* message,
641        const LinkID& link,
642        uint8_t priority ) throw(message_not_sent)
643{
644        LinkDescriptor* ld = getDescriptor(link);
645        if (ld==NULL)
646        {
647            throw message_not_sent("Cannot find descriptor to link id=" + link.toString());
648        }
649       
650        message->setSourceNode(nodeId);
651        message->setDestinationNode(ld->remoteNode);
652
653        message->setSourceLink(ld->overlayId);
654        message->setDestinationLink(ld->remoteLink);
655
656        message->setService(ld->service);
657        message->setRelayed(ld->relayed);
658   
659   
660    try
661    {
662        // * send message *
663        send( message, ld, priority );
664    }
665    catch ( message_not_sent& e )
666    {
667        // drop failed link
668        ld->failed = true;
669        dropLink(ld->overlayId);
670    }
671}
672
673// relay route management ------------------------------------------------------
674
675/// stabilize relay information
676void BaseOverlay::stabilizeRelays() {
677        vector<relay_route>::iterator i = relay_routes.begin();
678        while (i!=relay_routes.end() ) {
679                relay_route& route = *i;
680                LinkDescriptor* ld = getDescriptor(route.link);
681
682                // relay link still used and alive?
683                if (ld==NULL
684                                || !isLinkDirectVital(ld)
685                                || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT)  // TODO this was set to 8 before.. Is the new timeout better?
686                {
687                        logging_info("Forgetting relay information to node "
688                                        << route.node.toString() );
689                        i = relay_routes.erase(i);
690                } else
691                        i++;
692        }
693}
694
695void BaseOverlay::removeRelayLink( const LinkID& link ) {
696        vector<relay_route>::iterator i = relay_routes.begin();
697        while (i!=relay_routes.end() ) {
698                relay_route& route = *i;
699                if (route.link == link ) i = relay_routes.erase(i); else i++;
700        }
701}
702
703void BaseOverlay::removeRelayNode( const NodeID& remote ) {
704        vector<relay_route>::iterator i = relay_routes.begin();
705        while (i!=relay_routes.end() ) {
706                relay_route& route = *i;
707                if (route.node == remote ) i = relay_routes.erase(i); else i++;
708        }
709}
710
711/// refreshes relay information
712void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
713
714        // handle relayed messages from real links only
715        if (ld == NULL
716                        || ld->relayed
717                        || message->getSourceNode()==nodeId ) return;
718
719        // update usage information
720        if (message->isRelayed()) {
721                // try to find source node
722                foreach( relay_route& route, relay_routes ) {
723                        // relay route found? yes->
724                        if ( route.node == message->getDestinationNode() ) {
725                                ld->setRelaying();
726                                route.used = time(NULL);
727                        }
728                }
729
730        }
731
732        // register relay path
733        if (message->isRegisterRelay()) {
734                // set relaying
735                ld->setRelaying();
736
737                // try to find source node
738                foreach( relay_route& route, relay_routes ) {
739
740                        // relay route found? yes->
741                        if ( route.node == message->getSourceNode() ) {
742
743                                // refresh timer
744                                route.used = time(NULL);
745                                LinkDescriptor* rld = getDescriptor(route.link);
746
747                                // route has a shorter hop count or old link is dead? yes-> replace
748                                if (route.hops > message->getNumHops()
749                                                || rld == NULL
750                                                || !isLinkDirectVital(ld)) {
751                                        logging_info("Updating relay information to node "
752                                                        << route.node.toString()
753                                                        << " reducing to " << (int) message->getNumHops() << " hops.");
754                                        route.hops = message->getNumHops();
755                                        route.link = ld->overlayId;
756                                }
757                                return;
758                        }
759                }
760
761                // not found-> add new entry
762                relay_route route;
763                route.hops = message->getNumHops();
764                route.link = ld->overlayId;
765                route.node = message->getSourceNode();
766                route.used = time(NULL);
767                logging_info("Remembering relay information to node "
768                                << route.node.toString());
769                relay_routes.push_back(route);
770        }
771}
772
773/// returns a known "vital" relay link which is up and running
774LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
775        // try to find source node
776        foreach( relay_route& route, relay_routes ) {
777                if (route.node == remote ) {
778                        LinkDescriptor* ld = getDescriptor( route.link );
779                        if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else {
780                                route.used = time(NULL);
781                                return ld;
782                        }
783                }
784        }
785        return NULL;
786}
787
788/* *****************************************************************************
789 * PUBLIC MEMBERS
790 * ****************************************************************************/
791
792use_logging_cpp(BaseOverlay);
793
794// ----------------------------------------------------------------------------
795
796BaseOverlay::BaseOverlay() :
797                        started(false),state(BaseOverlayStateInvalid),
798                        bc(NULL),
799                        nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED),
800                        sideport(&SideportListener::DEFAULT), overlayInterface(NULL),
801                        counter(0) {
802}
803
804BaseOverlay::~BaseOverlay() {
805}
806
807// ----------------------------------------------------------------------------
808
809void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) {
810        logging_info("Starting...");
811
812        // set parameters
813        bc = _basecomm;
814        nodeId = _nodeid;
815
816        // register at base communication
817        bc->registerMessageReceiver( this );
818        bc->registerEventListener( this );
819
820        // timer for auto link management
821        Timer::setInterval( 1000 ); // XXX
822//      Timer::setInterval( 10000 );
823        Timer::start();
824
825        started = true;
826        state = BaseOverlayStateInvalid;
827}
828
829void BaseOverlay::stop() {
830        logging_info("Stopping...");
831
832        // stop timer
833        Timer::stop();
834
835        // delete oberlay interface
836        if(overlayInterface != NULL) {
837                delete overlayInterface;
838                overlayInterface = NULL;
839        }
840
841        // unregister at base communication
842        bc->unregisterMessageReceiver( this );
843        bc->unregisterEventListener( this );
844
845        started = false;
846        state = BaseOverlayStateInvalid;
847}
848
849bool BaseOverlay::isStarted(){
850        return started;
851}
852
853// ----------------------------------------------------------------------------
854
855void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
856                const EndpointDescriptor& bootstrapEp) {
857
858        if(id != spovnetId){
859                logging_error("attempt to join against invalid spovnet, call initiate first");
860                return;
861        }
862
863        //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
864        logging_info( "Starting to join spovnet " << id.toString() <<
865                        " with nodeid " << nodeId.toString());
866
867        if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
868
869                //** FIRST STEP - MANDATORY */
870
871                // bootstrap against ourselfs
872                logging_info("joining spovnet locally");
873
874                overlayInterface->joinOverlay();
875                state = BaseOverlayStateCompleted;
876                foreach( NodeListener* i, nodeListeners )
877                        i->onJoinCompleted( spovnetId );
878
879                //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
880                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
881
882        } else {
883
884                //** SECOND STEP - OPTIONAL */
885
886                // bootstrap against another node
887                logging_info("joining spovnet remotely against " << bootstrapEp.toString());
888
889                const LinkID& lnk = bc->establishLink( bootstrapEp );
890                bootstrapLinks.push_back(lnk);
891                logging_info("join process initiated for " << id.toString() << "...");
892        }
893}
894
895
896void BaseOverlay::startBootstrapModules(vector<pair<BootstrapManager::BootstrapType,string> > modules){
897        logging_debug("starting overlay bootstrap module");
898        overlayBootstrap.start(this, spovnetId, nodeId, modules);
899        overlayBootstrap.publish(bc->getEndpointDescriptor());
900}
901
902void BaseOverlay::stopBootstrapModules(){
903        logging_debug("stopping overlay bootstrap module");
904        overlayBootstrap.stop();
905        overlayBootstrap.revoke();
906}
907
908void BaseOverlay::leaveSpoVNet() {
909
910        logging_info( "Leaving spovnet " << spovnetId );
911        bool ret = ( state != this->BaseOverlayStateInvalid );
912
913        logging_debug( "Dropping all auto-links" );
914
915        // gather all service links
916        vector<LinkID> servicelinks;
917        foreach( LinkDescriptor* ld, links )
918        {
919                if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
920                        servicelinks.push_back( ld->overlayId );
921        }
922
923        // drop all service links
924        foreach( LinkID lnk, servicelinks )
925        {
926            logging_debug("Dropping service link " << lnk.toString());
927            dropLink( lnk );
928        }
929
930        // let the node leave the spovnet overlay interface
931        logging_debug( "Leaving overlay" );
932        if( overlayInterface != NULL )
933        {
934                overlayInterface->leaveOverlay();
935        }
936
937        // drop still open bootstrap links
938        foreach( LinkID lnk, bootstrapLinks )
939        {
940            logging_debug("Dropping bootstrap link " << lnk.toString());
941            bc->dropLink( lnk );
942        }
943
944        // change to inalid state
945        state = BaseOverlayStateInvalid;
946        //ovl.visShutdown( ovlId, nodeId, string("") );
947
948        visualInstance.visShutdown(visualIdOverlay, nodeId, "");
949        visualInstance.visShutdown(visualIdBase, nodeId, "");
950
951        // inform all registered services of the event
952        foreach( NodeListener* i, nodeListeners )
953        {
954                if( ret ) i->onLeaveCompleted( spovnetId );
955                else i->onLeaveFailed( spovnetId );
956        }
957}
958
959void BaseOverlay::createSpoVNet(const SpoVNetID& id,
960                const OverlayParameterSet& param,
961                const SecurityParameterSet& sec,
962                const QoSParameterSet& qos) {
963
964        // set the state that we are an initiator, this way incoming messages are
965        // handled correctly
966        logging_info( "creating spovnet " + id.toString() <<
967                        " with nodeid " << nodeId.toString() );
968
969        spovnetId = id;
970
971        overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
972        if( overlayInterface == NULL ) {
973                logging_fatal( "overlay structure not supported" );
974                state = BaseOverlayStateInvalid;
975
976                foreach( NodeListener* i, nodeListeners )
977                i->onJoinFailed( spovnetId );
978
979                return;
980        }
981
982        visualInstance.visCreate(visualIdBase, nodeId, "", "");
983        visualInstance.visCreate(visualIdOverlay, nodeId, "", "");
984}
985
986// ----------------------------------------------------------------------------
987
988const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
989                const NodeID& remoteId, const ServiceID& service ) {
990
991        // establish link via overlay
992        if (!remoteId.isUnspecified())
993                return establishLink( remoteId, service );
994        else
995                return establishDirectLink(remoteEp, service );
996}
997
998/// call base communication's establish link and add link mapping
999const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
1000                const ServiceID& service ) {
1001
1002        /// find a service listener
1003        if( !communicationListeners.contains( service ) ) {
1004                logging_error( "No listener registered for service id=" << service.toString() );
1005                return LinkID::UNSPECIFIED;
1006        }
1007        CommunicationListener* listener = communicationListeners.get( service );
1008        assert( listener != NULL );
1009
1010        // create descriptor
1011        LinkDescriptor* ld = addDescriptor();
1012        ld->relayed = false;
1013        ld->listener = listener;
1014        ld->service = service;
1015        ld->communicationId = bc->establishLink( ep );
1016
1017        /// establish link and add mapping
1018        logging_info("Establishing direct link " << ld->communicationId.toString()
1019                        << " using " << ep.toString());
1020
1021        return ld->communicationId;
1022}
1023
1024/// establishes a link between two arbitrary nodes
1025const LinkID BaseOverlay::establishLink( const NodeID& remote,
1026                const ServiceID& service ) {
1027
1028    // TODO What if we already have a Link to this node and this service id?
1029   
1030        // do not establish a link to myself!
1031        if (remote == nodeId) return 
1032                LinkID::UNSPECIFIED;
1033
1034       
1035        // create a link descriptor
1036        LinkDescriptor* ld = addDescriptor();
1037        ld->relayed = true;
1038        ld->remoteNode = remote;
1039        ld->service = service;
1040        ld->listener = getListener(ld->service);
1041   
1042    // initialize sequence numbers
1043    ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
1044    logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum);
1045
1046        // create link request message
1047        OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
1048        msg.setSourceLink(ld->overlayId);
1049
1050        // send over relayed link
1051        msg.setRelayed(true);
1052        msg.setRegisterRelay(true);
1053//      msg.setRouteRecord(true);
1054   
1055    msg.setSeqNum(ld->last_sent_seqnum);
1056
1057        // debug message
1058        logging_info(
1059                        "Sending link request with"
1060                        << " link=" << ld->overlayId.toString()
1061                        << " node=" << ld->remoteNode.toString()
1062                        << " serv=" << ld->service.toString()
1063        );
1064
1065       
1066        // sending message to node
1067        try
1068        {
1069            // * send *
1070            seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service );
1071        }
1072        catch ( message_not_sent& e )
1073        {
1074            logging_warn("Link request not sent: " << e.what());
1075           
1076            // Message not sent. Cancel link request.
1077            SystemQueue::instance().scheduleCall(
1078                    boost::bind(
1079                            &BaseOverlay::__onLinkEstablishmentFailed,
1080                            this,
1081                            ld->overlayId)
1082                );
1083        }
1084       
1085        return ld->overlayId;
1086}
1087
1088/// NOTE: "id" is an Overlay-LinkID
1089void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id)
1090{
1091    // TODO This code redundant. But also it's not easy to aggregate in one function.
1092   
1093    // get descriptor for link
1094    LinkDescriptor* ld = getDescriptor(id, false);
1095    if ( ld == NULL ) return; // not found? ->ignore!
1096
1097    logging_debug( "__onLinkEstablishmentFaild: " << ld );
1098
1099    // removing relay link information
1100    removeRelayLink(ld->overlayId);
1101
1102    // inform listeners about link down
1103    ld->communicationUp = false;
1104    if (!ld->service.isUnspecified())
1105    {
1106        CommunicationListener* lst = getListener(ld->service);
1107        if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode );
1108        sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1109    }
1110
1111    // delete all queued messages (auto links)
1112    if( ld->messageQueue.size() > 0 ) {
1113        logging_warn( "Dropping link " << id.toString() << " that has "
1114                << ld->messageQueue.size() << " waiting messages" );
1115        ld->flushQueue();
1116    }
1117
1118    // erase mapping
1119    eraseDescriptor(ld->overlayId);
1120}
1121
1122
1123/// drops an established link
1124void BaseOverlay::dropLink(const LinkID& link)
1125{
1126        logging_info( "Dropping link: " << link.toString() );
1127
1128        // find the link item to drop
1129        LinkDescriptor* ld = getDescriptor(link);
1130        if( ld == NULL )
1131        {
1132                logging_warn( "Can't drop link, link is unknown!");
1133                return;
1134        }
1135
1136        // delete all queued messages
1137        if( ld->messageQueue.size() > 0 )
1138        {
1139                logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
1140                                << ld->messageQueue.size() << " waiting messages" );
1141                ld->flushQueue();
1142        }
1143       
1144           
1145        // inform application and remote note (but only once)
1146        //   NOTE: If we initiated the drop, this function is called twice, but on
1147        //   the second call, there is noting to do.
1148        if ( ld->up && ! ld->failed )
1149        {
1150        // inform sideport and listener
1151        if(ld->listener != NULL)
1152        {
1153            ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
1154        }
1155        sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
1156       
1157        // send link-close to remote node
1158        logging_info("Sending LinkClose message to remote node.");
1159        OverlayMsg close_msg(OverlayMsg::typeLinkClose);
1160        send_link(&close_msg, link, system_priority::OVERLAY);
1161   
1162        // deactivate link
1163        ld->up = false;
1164//         ld->closing = true;
1165        }
1166       
1167        else if ( ld->failed )
1168    {
1169        // inform listener
1170        if( ld->listener != NULL )
1171        {
1172            ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1173        }
1174       
1175        ld->up = false;
1176        __removeDroppedLink(ld->overlayId);
1177    }
1178}
1179
1180/// called from typeLinkClose-handler
1181void BaseOverlay::__removeDroppedLink(const LinkID& link)
1182{
1183    // find the link item to drop
1184    LinkDescriptor* ld = getDescriptor(link);
1185    if( ld == NULL )
1186    {
1187        return;
1188    }
1189
1190    // do not drop relay links
1191    if (!ld->relaying)
1192    {
1193        // drop the link in base communication
1194        if (ld->communicationUp)
1195        {
1196            bc->dropLink( ld->communicationId );
1197        }
1198
1199        // erase descriptor
1200        eraseDescriptor( ld->overlayId );
1201    }
1202    else
1203    {
1204        ld->dropAfterRelaying = true;
1205    }
1206}
1207
1208// ----------------------------------------------------------------------------
1209
1210/// internal send message, always use this functions to send messages over links
1211const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message,
1212        const LinkID& link,
1213        uint8_t priority ) throw(message_not_sent)
1214{
1215        logging_debug( "Sending data message on link " << link.toString() );
1216
1217        // get the mapping for this link
1218        LinkDescriptor* ld = getDescriptor(link);
1219        if( ld == NULL )
1220        {
1221            throw message_not_sent("Could not send message. Link not found id=" + link.toString());
1222        }
1223
1224        // check if the link is up yet, if its an auto link queue message
1225        if( !ld->up )
1226        {
1227                ld->setAutoUsed();
1228                if( ld->autolink )
1229                {
1230                        logging_info("Auto-link " << link.toString() << " not up, queue message");
1231                       
1232                        // queue message
1233                LinkDescriptor::message_queue_entry msg;
1234                msg.message = message;
1235                msg.priority = priority;
1236
1237                        ld->messageQueue.push_back( msg );
1238                       
1239                        return SequenceNumber::DISABLED;  // TODO what to return if message is queued?
1240                }
1241                else
1242                {
1243                    throw message_not_sent("Link " + link.toString() + " not up, drop message");
1244                }
1245        }
1246       
1247        // TODO AKTUELL: sequence numbers
1248        // TODO seqnum on fast path ?
1249        ld->last_sent_seqnum.increment();
1250       
1251        /* choose fast-path for direct links; normal overlay-path otherwise */
1252        // BRANCH: direct link
1253        if ( ld->communicationUp && !ld->relayed )
1254        {
1255            // * send down to BaseCommunication *
1256            try
1257            {
1258                bc->sendMessage(ld->communicationId, message, priority, true);
1259        }
1260        catch ( communication::communication_message_not_sent& e )
1261        {
1262            ostringstream out;
1263            out << "Communication message on fast-path not sent: " << e.what();
1264            throw message_not_sent(out.str());
1265        }
1266        }
1267
1268        // BRANCH: use (slow) overlay-path
1269        else
1270        {
1271        // compile overlay message (has service and node id)
1272        OverlayMsg overmsg( OverlayMsg::typeData );
1273        overmsg.set_payload_message(message);
1274       
1275        // set SeqNum
1276        if ( ld->transmit_seqnums )
1277        {
1278            overmsg.setSeqNum(ld->last_sent_seqnum);
1279        }
1280        logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum());
1281   
1282        // send message over relay/direct/overlay
1283        send_link( &overmsg, ld->overlayId, priority );
1284        }
1285       
1286        // return seqnum
1287        return ld->last_sent_seqnum;
1288}
1289
1290
1291const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message,
1292                const NodeID& node, uint8_t priority, const ServiceID& service) {
1293
1294        // find link for node and service
1295        LinkDescriptor* ld = getAutoDescriptor( node, service );
1296
1297        // if we found no link, create an auto link
1298        if( ld == NULL ) {
1299
1300                // debug output
1301                logging_info( "No link to send message to node "
1302                                << node.toString() << " found for service "
1303                                << service.toString() << ". Creating auto link ..."
1304                );
1305
1306                // call base overlay to create a link
1307                LinkID link = establishLink( node, service );
1308                ld = getDescriptor( link );
1309                if( ld == NULL ) {
1310                        logging_error( "Failed to establish auto-link.");
1311            throw message_not_sent("Failed to establish auto-link.");
1312                }
1313                ld->autolink = true;
1314
1315                logging_debug( "Auto-link establishment in progress to node "
1316                                << node.toString() << " with link id=" << link.toString() );
1317        }
1318        assert(ld != NULL);
1319
1320        // mark the link as used, as we now send a message through it
1321        ld->setAutoUsed();
1322
1323        // send / queue message
1324        return sendMessage( message, ld->overlayId, priority );
1325}
1326
1327
1328NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message,
1329        const NodeID& address, uint8_t priority, const ServiceID& service) {
1330   
1331    if ( overlayInterface->isClosestNodeTo(address) )
1332    {
1333        return NodeID::UNSPECIFIED;
1334    }
1335       
1336    const NodeID& closest_node = overlayInterface->getNextNodeId(address); 
1337   
1338    if ( closest_node != NodeID::UNSPECIFIED )
1339    {
1340        sendMessage(message, closest_node, priority, service);
1341    }
1342   
1343    return closest_node;  // return seqnum ?? tuple? closest_node via (non const) reference?
1344}
1345// ----------------------------------------------------------------------------
1346
1347const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1348                const LinkID& link) const {
1349
1350        // return own end-point descriptor
1351        if( link.isUnspecified() )
1352                return bc->getEndpointDescriptor();
1353
1354        // find link descriptor. not found -> return unspecified
1355        const LinkDescriptor* ld = getDescriptor(link);
1356        if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
1357
1358        // return endpoint-descriptor from base communication
1359        return bc->getEndpointDescriptor( ld->communicationId );
1360}
1361
1362const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1363                const NodeID& node) const {
1364
1365        // return own end-point descriptor
1366        if( node == nodeId || node.isUnspecified() ) {
1367                //logging_info("getEndpointDescriptor: returning self.");
1368                return bc->getEndpointDescriptor();
1369        }
1370
1371        // no joined and request remote descriptor? -> fail!
1372        if( overlayInterface == NULL ) {
1373                logging_error( "Overlay interface not set, cannot resolve end-point." );
1374                return EndpointDescriptor::UNSPECIFIED();
1375        }
1376
1377//      // resolve end-point descriptor from the base-overlay routing table
1378//      const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
1379//      if(ep.toString() != "") return ep;
1380
1381        // see if we can find the node in our own table
1382        foreach(const LinkDescriptor* ld, links){
1383                if(ld->remoteNode != node) continue;
1384                if(!ld->communicationUp) continue;
1385                const EndpointDescriptor& ep =
1386                                bc->getEndpointDescriptor(ld->communicationId);
1387                if(ep != EndpointDescriptor::UNSPECIFIED()) {
1388                        //logging_info("getEndpointDescriptor: using " << ld->to_string());
1389                        return ep;
1390                }
1391        }
1392
1393        logging_warn( "No EndpointDescriptor found for node " << node );
1394        logging_warn( const_cast<BaseOverlay*>(this)->debugInformation() );
1395
1396        return EndpointDescriptor::UNSPECIFIED();
1397}
1398
1399// ----------------------------------------------------------------------------
1400
1401bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1402        sideport = _sideport;
1403        _sideport->configure( this );
1404        return true;
1405}
1406
1407bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1408        sideport = &SideportListener::DEFAULT;
1409        return true;
1410}
1411
1412// ----------------------------------------------------------------------------
1413
1414bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1415        logging_debug( "binding communication listener " << listener
1416                        << " on serviceid " << sid.toString() );
1417
1418        if( communicationListeners.contains( sid ) ) {
1419                logging_error( "some listener already registered for service id "
1420                                << sid.toString() );
1421                return false;
1422        }
1423
1424        communicationListeners.registerItem( listener, sid );
1425        return true;
1426}
1427
1428
1429bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1430        logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1431
1432        if( !communicationListeners.contains( sid ) ) {
1433                logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1434                return false;
1435        }
1436
1437        if( communicationListeners.get(sid) != listener ) {
1438                logging_warn( "listener bound to service id " << sid.toString()
1439                                << " is different than listener trying to unbind" );
1440                return false;
1441        }
1442
1443        communicationListeners.unregisterItem( sid );
1444        return true;
1445}
1446
1447// ----------------------------------------------------------------------------
1448
1449bool BaseOverlay::bind(NodeListener* listener) {
1450        logging_debug( "Binding node listener " << listener );
1451
1452        // already bound? yes-> warning
1453        NodeListenerVector::iterator i =
1454                        find( nodeListeners.begin(), nodeListeners.end(), listener );
1455        if( i != nodeListeners.end() ) {
1456                logging_warn("Node listener " << listener << " is already bound!" );
1457                return false;
1458        }
1459
1460        // no-> add
1461        nodeListeners.push_back( listener );
1462        return true;
1463}
1464
1465bool BaseOverlay::unbind(NodeListener* listener) {
1466        logging_debug( "Unbinding node listener " << listener );
1467
1468        // already unbound? yes-> warning
1469        NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1470        if( i == nodeListeners.end() ) {
1471                logging_warn( "Node listener " << listener << " is not bound!" );
1472                return false;
1473        }
1474
1475        // no-> remove
1476        nodeListeners.erase( i );
1477        return true;
1478}
1479
1480// ----------------------------------------------------------------------------
1481
1482void BaseOverlay::onLinkUp(const LinkID& id,
1483        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
1484{
1485        logging_debug( "Link up with base communication link id=" << id );
1486
1487        // get descriptor for link
1488        LinkDescriptor* ld = getDescriptor(id, true);
1489
1490        // BRANCH: handle bootstrap link we initiated
1491        if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1492                logging_info(
1493                                "Join has been initiated by me and the link is now up. " <<
1494                                "LinkID: " << id.toString() <<
1495                                "Sending out join request for SpoVNet " << spovnetId.toString()
1496                );
1497
1498                // send join request message
1499                OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1500                                OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1501                JoinRequest joinRequest( spovnetId, nodeId );
1502                overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer());
1503
1504                send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
1505               
1506                return;
1507        }
1508
1509        // BRANCH: link establishment from remote, add one!
1510        if (ld == NULL) {
1511                ld = addDescriptor( id );
1512                logging_info( "onLinkUp (remote request) descriptor: " << ld );
1513
1514                // update descriptor
1515                ld->fromRemote = true;
1516                ld->communicationId = id;
1517                ld->communicationUp = true;
1518                ld->setAutoUsed();
1519                ld->setAlive();
1520
1521                // in this case, do not inform listener, since service it unknown
1522                // -> wait for update message!
1523        }
1524       
1525        // BRANCH: We requested this link in the first place
1526        else
1527        {
1528                logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1529
1530                // update descriptor
1531                ld->setAutoUsed();
1532                ld->setAlive();
1533                ld->communicationUp = true;
1534                ld->fromRemote = false;
1535
1536                // BRANCH: this was a relayed link before --> convert to direct link
1537                //   TODO do we really have to send a message here?
1538                if (ld->relayed)
1539                {
1540                        ld->up = true;
1541                        ld->relayed = false;
1542                        logging_info( "Converting to direct link: " << ld );
1543                       
1544                        // send message
1545                        OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1546                        overMsg.setSourceLink( ld->overlayId );
1547                        overMsg.setDestinationLink( ld->remoteLink );
1548                        send_link( &overMsg, ld->overlayId, system_priority::OVERLAY );
1549                       
1550                    // inform listener
1551                    if( ld->listener != NULL)
1552                        ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1553                }
1554               
1555
1556        /* NOTE: Chord is opening direct-links in it's setup routine which are
1557         *   neither set to "relayed" nor to "up". To activate these links a
1558         *   typeLinkUpdate must be sent.
1559         *   
1560         * This branch is would also be taken when we had a working link before
1561         *   (ld->up == true). I'm not sure if this case does actually happen
1562         *   and whether it's tested.
1563         */
1564                else
1565                {
1566                        // note: necessary to validate the link on the remote side!
1567                        logging_info( "Sending out update" <<
1568                                        " for service " << ld->service.toString() <<
1569                                        " with local node id " << nodeId.toString() <<
1570                                        " on link " << ld->overlayId.toString() );
1571
1572                        // compile and send update message
1573                        OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1574                        overlayMsg.setAutoLink( ld->autolink );
1575                    overlayMsg.setSourceNode(nodeId);
1576                    overlayMsg.setDestinationNode(ld->remoteNode);
1577                    overlayMsg.setSourceLink(ld->overlayId);
1578                    overlayMsg.setDestinationLink(ld->remoteLink);
1579                    overlayMsg.setService(ld->service);
1580                    overlayMsg.setRelayed(false);
1581
1582                    // TODO ld->communicationId = id ??
1583                   
1584                    send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
1585                }
1586        }
1587}
1588
1589void BaseOverlay::onLinkDown(const LinkID& id,
1590        const addressing2::EndpointPtr local,
1591        const addressing2::EndpointPtr remote)
1592{
1593        // erase bootstrap links
1594        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1595        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1596
1597        // get descriptor for link
1598        LinkDescriptor* ld = getDescriptor(id, true);
1599        if ( ld == NULL ) return; // not found? ->ignore!
1600        logging_info( "onLinkDown descriptor: " << ld );
1601
1602        // removing relay link information
1603        removeRelayLink(ld->overlayId);
1604
1605        // inform listeners about link down
1606        ld->communicationUp = false;
1607        if (!ld->service.isUnspecified()) {
1608                CommunicationListener* lst = getListener(ld->service);
1609                if(lst != NULL) lst->onLinkDown( ld->overlayId, ld->remoteNode );
1610                sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1611        }
1612
1613        // delete all queued messages (auto links)
1614        if( ld->messageQueue.size() > 0 ) {
1615                logging_warn( "Dropping link " << id.toString() << " that has "
1616                                << ld->messageQueue.size() << " waiting messages" );
1617                ld->flushQueue();
1618        }
1619
1620        // erase mapping
1621        eraseDescriptor(ld->overlayId);
1622}
1623
1624
1625void BaseOverlay::onLinkFail(const LinkID& id,
1626        const addressing2::EndpointPtr local,
1627        const addressing2::EndpointPtr remote)
1628{
1629        logging_debug( "Link fail with base communication link id=" << id );
1630
1631//      // erase bootstrap links
1632//      vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1633//      if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1634//
1635//      // get descriptor for link
1636//      LinkDescriptor* ld = getDescriptor(id, true);
1637//      if ( ld == NULL ) return; // not found? ->ignore!
1638//      logging_debug( "Link failed id=" << ld->overlayId.toString() );
1639//
1640//      // inform listeners
1641//      ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1642//      sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1643       
1644        logging_debug( "  ... calling onLinkDown ..." );
1645        onLinkDown(id, local, remote);
1646}
1647
1648
1649void BaseOverlay::onLinkChanged(const LinkID& id,
1650        const addressing2::EndpointPtr oldlocal,  const addressing2::EndpointPtr newlocal,
1651        const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote)
1652{
1653    // get descriptor for link
1654    LinkDescriptor* ld = getDescriptor(id, true);
1655    if ( ld == NULL ) return; // not found? ->ignore!
1656    logging_debug( "onLinkChanged descriptor: " << ld );
1657
1658    // inform listeners
1659    ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1660    sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1661
1662    // autolinks: refresh timestamp
1663    ld->setAutoUsed();
1664}
1665
1666//void BaseOverlay::onLinkQoSChanged(const LinkID& id,
1667//        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
1668//        const QoSParameterSet& qos)
1669//{
1670//      logging_debug( "Link quality changed with base communication link id=" << id );
1671//
1672//      // get descriptor for link
1673//      LinkDescriptor* ld = getDescriptor(id, true);
1674//      if ( ld == NULL ) return; // not found? ->ignore!
1675//      logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1676//}
1677
1678bool BaseOverlay::onLinkRequest(const LinkID& id,
1679        const addressing2::EndpointPtr local,
1680        const addressing2::EndpointPtr remote)
1681{
1682        logging_debug("Accepting link request from " << remote->to_string() );
1683       
1684        // TODO ask application..?
1685       
1686        return true;
1687}
1688
1689
1690
1691
1692/// handles a message from base communication
1693bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message,
1694                const LinkID& link,
1695                const NodeID&,
1696                bool bypass_overlay )
1697{
1698        // get descriptor for link
1699        LinkDescriptor* ld = getDescriptor( link, true );
1700
1701       
1702        /* choose fastpath for direct links; normal overlay-path otherwise */   
1703        if ( bypass_overlay && ld )
1704        {
1705        // message received --> link is alive
1706        ld->keepAliveReceived = time(NULL);
1707        // hop count on this link
1708        ld->hops = 0;
1709
1710       
1711        // hand over to CommunicationListener (aka Application)
1712            CommunicationListener* lst = getListener(ld->service);
1713            if ( lst != NULL )
1714            {
1715                lst->onMessage(
1716                        message,
1717                        ld->remoteNode,
1718                        ld->overlayId,
1719                    SequenceNumber::DISABLED,
1720                        NULL );
1721               
1722                return true;
1723            }
1724
1725            return false;
1726        }
1727        else
1728        {
1729            return handleMessage( message, ld, link );     
1730        }
1731}
1732
1733// ----------------------------------------------------------------------------
1734
1735/// Handle spovnet instance join requests
1736bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink )
1737{
1738        // decapsulate message
1739        JoinRequest joinReq;
1740        joinReq.deserialize_from_shared_buffer(message);
1741       
1742        logging_info( "Received join request for spovnet " <<
1743                        joinReq.getSpoVNetID().toString() );
1744
1745        // check spovnet id
1746        if( joinReq.getSpoVNetID() != spovnetId ) {
1747                logging_error(
1748                                "Received join request for spovnet we don't handle " <<
1749                                joinReq.getSpoVNetID().toString() );
1750
1751                return false;
1752        }
1753
1754        // TODO: here you can implement mechanisms to deny joining of a node
1755        bool allow = true;
1756        logging_info( "Sending join reply for spovnet " <<
1757                        spovnetId.toString() << " to node " <<
1758                        source.toString() <<
1759                        ". Result: " << (allow ? "allowed" : "denied") );
1760        joiningNodes.push_back( source );
1761
1762        // return overlay parameters
1763        assert( overlayInterface != NULL );
1764        logging_debug( "Using bootstrap end-point "
1765                        << getEndpointDescriptor().toString() )
1766        OverlayParameterSet parameters = overlayInterface->getParameters();
1767       
1768       
1769        // create JoinReplay Message
1770        OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1771                        OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1772        JoinReply replyMsg( spovnetId, parameters, allow );
1773        retmsg.append_buffer(replyMsg.serialize_into_shared_buffer());
1774
1775        // XXX This is unlovely clash between the old message system and the new one,
1776        // but a.t.m. we can't migrate everything to the new system at once..
1777        // ---> Consider the EndpointDescriptor as part of the JoinReply..
1778        retmsg.append_buffer(getEndpointDescriptor().serialize());
1779       
1780        // * send *
1781        send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY);
1782
1783        return true;
1784}
1785
1786/// Handle replies to spovnet instance join requests
1787bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink )
1788{
1789        // decapsulate message
1790        logging_debug("received join reply message");
1791        JoinReply replyMsg;
1792        EndpointDescriptor endpoints;
1793        reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message);
1794        buff = endpoints.deserialize(buff);
1795
1796        // correct spovnet?
1797        if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail
1798                logging_error( "Received SpoVNet join reply for " <<
1799                                replyMsg.getSpoVNetID().toString() <<
1800                                " != " << spovnetId.toString() );
1801
1802                return false;
1803        }
1804
1805        // access granted? no -> fail
1806        if( !replyMsg.getJoinAllowed() ) {
1807                logging_error( "Our join request has been denied" );
1808
1809                // drop initiator link
1810                if( !bcLink.isUnspecified() ){
1811                        bc->dropLink( bcLink );
1812
1813                        vector<LinkID>::iterator it = std::find(
1814                                        bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1815                        if( it != bootstrapLinks.end() )
1816                                bootstrapLinks.erase(it);
1817                }
1818
1819                // inform all registered services of the event
1820                foreach( NodeListener* i, nodeListeners )
1821                i->onJoinFailed( spovnetId );
1822
1823                return true;
1824        }
1825
1826        // access has been granted -> continue!
1827        logging_info("Join request has been accepted for spovnet " <<
1828                        spovnetId.toString() );
1829
1830        logging_debug( "Using bootstrap end-point "
1831                        << endpoints.toString() );
1832
1833        // create overlay structure from spovnet parameter set
1834        // if we have not boostrapped yet against some other node
1835        if( overlayInterface == NULL ){
1836
1837                logging_debug("first-time bootstrapping");
1838
1839                overlayInterface = OverlayFactory::create(
1840                                *this, replyMsg.getParam(), nodeId, this );
1841
1842                // overlay structure supported? no-> fail!
1843                if( overlayInterface == NULL ) {
1844                        logging_error( "overlay structure not supported" );
1845
1846                        if( !bcLink.isUnspecified() ){
1847                                bc->dropLink( bcLink );
1848
1849                                vector<LinkID>::iterator it = std::find(
1850                                                bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1851                                if( it != bootstrapLinks.end() )
1852                                        bootstrapLinks.erase(it);
1853                        }
1854
1855                        // inform all registered services of the event
1856                        foreach( NodeListener* i, nodeListeners )
1857                        i->onJoinFailed( spovnetId );
1858
1859                        return true;
1860                }
1861
1862                // everything ok-> join the overlay!
1863                state = BaseOverlayStateCompleted;
1864                overlayInterface->createOverlay();
1865
1866                overlayInterface->joinOverlay( endpoints );
1867                overlayBootstrap.recordJoin( endpoints );
1868
1869                // update ovlvis
1870                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1871
1872                // inform all registered services of the event
1873                foreach( NodeListener* i, nodeListeners )
1874                    i->onJoinCompleted( spovnetId );
1875        }
1876        else
1877        {
1878                // this is not the first bootstrap, just join the additional node
1879                logging_debug("not first-time bootstrapping");
1880                overlayInterface->joinOverlay( endpoints );
1881                overlayBootstrap.recordJoin( endpoints );
1882        } // if( overlayInterface == NULL )
1883
1884        return true;
1885}
1886
1887
1888bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld )
1889{
1890        // get service
1891        const ServiceID& service = ld->service; //overlayMsg->getService();
1892
1893        logging_debug( "Received data for service " << service.toString()
1894                        << " on link " << overlayMsg->getDestinationLink().toString() );
1895
1896        // delegate data message
1897        CommunicationListener* lst = getListener(service);
1898        if(lst != NULL){
1899                lst->onMessage(
1900                                message,
1901//                              overlayMsg->getSourceNode(),
1902//                              overlayMsg->getDestinationLink(),
1903                                ld->remoteNode,
1904                                ld->overlayId,
1905                overlayMsg->getSeqNum(),
1906                                overlayMsg
1907                );
1908        }
1909
1910        return true;
1911}
1912
1913bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg )
1914{
1915    /**
1916     * Deserialize MessageLost-Message
1917     *
1918     * - Type of lost message
1919     * - Hop count of lost message
1920     * - Source-LinkID  of lost message
1921     */
1922    const uint8_t* buff = message(0, sizeof(uint8_t)*2).data();
1923    uint8_t type = buff[0];
1924    uint8_t hops = buff[1];
1925    LinkID linkid;
1926    linkid.deserialize(message(sizeof(uint8_t)*2));
1927   
1928    logging_warn("Node " << msg->getSourceNode()
1929            << " informed us, that our message of type " << (int) type
1930            << " is lost after traveling " << (int) hops << " hops."
1931            << " (LinkID: " << linkid.toString());
1932
1933   
1934    // TODO switch-case ?
1935   
1936    // BRANCH: LinkRequest --> link request failed
1937    if ( type == OverlayMsg::typeLinkRequest )
1938    {
1939        __onLinkEstablishmentFailed(linkid);
1940    }
1941   
1942    // BRANCH: Data --> link disrupted. Drop link.
1943    //   (We could use something more advanced here. e.g. At least send a
1944    //    keep-alive message and wait for a keep-alive reply.)
1945    if ( type == OverlayMsg::typeData )
1946    {
1947        LinkDescriptor* link_desc = getDescriptor(linkid);
1948       
1949        if ( link_desc )
1950        {
1951            link_desc->failed = true;
1952        }
1953       
1954        dropLink(linkid);
1955    }
1956   
1957    // BRANCH: ping lost
1958    if ( type == OverlayMsg::typePing )
1959    {
1960        CommunicationListener* lst = getListener(msg->getService());
1961        if( lst != NULL )
1962        {
1963            lst->onPingLost(msg->getSourceNode());
1964        }
1965    }
1966   
1967    return true;
1968}
1969
1970bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld )
1971{
1972    // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node)
1973   
1974    bool send_pong = false;
1975   
1976    // inform application and ask permission to send a pong message
1977    CommunicationListener* lst = getListener(overlayMsg->getService());
1978    if( lst != NULL )
1979    {
1980        send_pong = lst->onPing(overlayMsg->getSourceNode());
1981    }
1982   
1983    // send pong message if allowed
1984    if ( send_pong )
1985    {
1986        OverlayMsg pong_msg(OverlayMsg::typePong);
1987        pong_msg.setSeqNum(overlayMsg->getSeqNum());
1988       
1989        // send message
1990        try
1991        {
1992            send_node( &pong_msg, 
1993                overlayMsg->getSourceNode(), 
1994                system_priority::OVERLAY,
1995                overlayMsg->getService() );
1996        }
1997        catch ( message_not_sent& e )
1998        {
1999            logging_info("Could not send Pong-Message to node: " << 
2000                overlayMsg->getSourceNode());
2001        }
2002    }
2003}
2004
2005bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2006{
2007    // inform application
2008    CommunicationListener* lst = getListener(overlayMsg->getService());
2009    if( lst != NULL )
2010    {
2011        lst->onPong(overlayMsg->getSourceNode());
2012    }
2013}
2014
2015bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2016
2017        if( ld == NULL ) {
2018                logging_warn( "received overlay update message for link for "
2019                                << "which we have no mapping" );
2020                return false;
2021        }
2022        logging_info("Received type update message on link " << ld );
2023
2024        // update our link mapping information for this link
2025        bool changed =
2026                        ( ld->remoteNode != overlayMsg->getSourceNode() )
2027                        || ( ld->service != overlayMsg->getService() );
2028
2029        // set parameters
2030        ld->up         = true;
2031        ld->remoteNode = overlayMsg->getSourceNode();
2032        ld->remoteLink = overlayMsg->getSourceLink();
2033        ld->service    = overlayMsg->getService();
2034        ld->autolink   = overlayMsg->isAutoLink();
2035
2036        // if our link information changed, we send out an update, too
2037        if( changed ) {
2038                overlayMsg->swapRoles();
2039                overlayMsg->setSourceNode(nodeId);
2040                overlayMsg->setSourceLink(ld->overlayId);
2041                overlayMsg->setService(ld->service);
2042                send( overlayMsg, ld, system_priority::OVERLAY );
2043        }
2044
2045        // service registered? no-> error!
2046        if( !communicationListeners.contains( ld->service ) ) {
2047                logging_warn( "Link up: event listener has not been registered" );
2048                return false;
2049        }
2050
2051        // default or no service registered?
2052        CommunicationListener* listener = communicationListeners.get( ld->service );
2053        if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
2054                logging_warn("Link up: event listener is default or null!" );
2055                return true;
2056        }
2057
2058        // update descriptor
2059        ld->listener = listener;
2060        ld->setAutoUsed();
2061        ld->setAlive();
2062
2063        // ask the service whether it wants to accept this link
2064        if( !listener->onLinkRequest(ld->remoteNode) ) {
2065
2066                logging_debug("Link id=" << ld->overlayId.toString() <<
2067                                " has been denied by service " << ld->service.toString() << ", dropping link");
2068
2069                // prevent onLinkDown calls to the service
2070                ld->listener = &CommunicationListener::DEFAULT;
2071
2072                // drop the link
2073                dropLink( ld->overlayId );
2074                return true;
2075        }
2076
2077        // set link up
2078        ld->up = true;
2079        logging_info( "Link has been accepted by service and is up: " << ld );
2080
2081        // auto links: link has been accepted -> send queued messages
2082        if( ld->messageQueue.size() > 0 ) {
2083                logging_info( "Sending out queued messages on link " << ld );
2084        foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue )
2085        {
2086            sendMessage( msg.message, ld->overlayId, msg.priority );
2087        }
2088                ld->messageQueue.clear();
2089        }
2090
2091        // call the notification functions
2092        listener->onLinkUp( ld->overlayId, ld->remoteNode );
2093        sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
2094
2095        return true;
2096}
2097
2098/// handle a link request and reply
2099bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2100
2101        //TODO: Check if a request has already been sent using getSourceLink() ...
2102
2103        // create link descriptor
2104        LinkDescriptor* ldn = addDescriptor();
2105
2106        // flags
2107        ldn->up = true;
2108        ldn->fromRemote = true;
2109        ldn->relayed = true;
2110
2111        // parameters
2112        ldn->service = overlayMsg->getService();
2113        ldn->listener = getListener(ldn->service);
2114        ldn->remoteNode = overlayMsg->getSourceNode();
2115        ldn->remoteLink = overlayMsg->getSourceLink();
2116        ldn->hops = overlayMsg->getNumHops();
2117       
2118    // initialize sequence numbers
2119    ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
2120    logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum);
2121   
2122   
2123        // update time-stamps
2124        ldn->setAlive();
2125        ldn->setAutoUsed();
2126
2127        logging_info( "Link request received from node id="
2128                << overlayMsg->getSourceNode()
2129                << " LINK: "
2130                << ldn);
2131       
2132        // create reply message and send back!
2133        overlayMsg->swapRoles(); // swap source/destination
2134        overlayMsg->setType(OverlayMsg::typeLinkReply);
2135        overlayMsg->setSourceLink(ldn->overlayId);
2136        overlayMsg->setRelayed(true);
2137//      overlayMsg->setRouteRecord(true);
2138    overlayMsg->setSeqNum(ld->last_sent_seqnum);
2139       
2140        // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!!
2141        // append our endpoints (for creation of a direct link)
2142        overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize());
2143       
2144        send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link
2145
2146        // inform listener
2147        if(ldn != NULL && ldn->listener != NULL)
2148                ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
2149
2150        return true;
2151}
2152
2153bool BaseOverlay::handleLinkReply(
2154        OverlayMsg* overlayMsg,
2155        reboost::shared_buffer_t sub_message,
2156        LinkDescriptor* ld )
2157{
2158    // deserialize EndpointDescriptor
2159    EndpointDescriptor endpoints;
2160    endpoints.deserialize(sub_message);
2161   
2162        // find link request
2163        LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
2164
2165        // not found? yes-> drop with error!
2166        if (ldn == NULL) {
2167                logging_error( "No link request pending for "
2168                                << overlayMsg->getDestinationLink().toString() );
2169                return false;
2170        }
2171        logging_debug("Handling link reply for " << ldn )
2172
2173        // check if already up
2174        if (ldn->up) {
2175                logging_warn( "Link already up: " << ldn );
2176                return true;
2177        }
2178
2179        // debug message
2180        logging_info( "Link request reply received. Establishing link"
2181                        << " for service " << overlayMsg->getService().toString()
2182                        << " with local id=" << overlayMsg->getDestinationLink()
2183                        << " and remote link id=" << overlayMsg->getSourceLink()
2184                        << " to " << endpoints.toString()
2185                        << " hop count: " << overlayMsg->getRouteRecord().size()
2186        );
2187
2188        // set local link descriptor data
2189        ldn->up = true;
2190        ldn->relayed = true;
2191        ldn->service = overlayMsg->getService();
2192        ldn->listener = getListener(ldn->service);
2193        ldn->remoteLink = overlayMsg->getSourceLink();
2194        ldn->remoteNode = overlayMsg->getSourceNode();
2195
2196        // update timestamps
2197        ldn->setAlive();
2198        ldn->setAutoUsed();
2199
2200        // auto links: link has been accepted -> send queued messages
2201        if( ldn->messageQueue.size() > 0 ) {
2202                logging_info( "Sending out queued messages on link " <<
2203                                ldn->overlayId.toString() );
2204                foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue )
2205                {
2206                        sendMessage( msg.message, ldn->overlayId, msg.priority );
2207                }
2208                ldn->messageQueue.clear();
2209        }
2210
2211        // inform listeners about new link
2212        ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
2213
2214        // try to replace relay link with direct link
2215        ldn->retryCounter = 3;
2216        ldn->endpoint = endpoints;
2217        ldn->communicationId =  bc->establishLink( ldn->endpoint );
2218
2219        return true;
2220}
2221
2222/// handle a keep-alive message for a link
2223bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2224{
2225        LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
2226       
2227        if ( rld != NULL )
2228        {
2229                logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() );
2230                if (overlayMsg->isRouteRecord())
2231                {
2232                        rld->routeRecord = overlayMsg->getRouteRecord();
2233                }
2234               
2235                // set alive
2236                rld->setAlive();
2237               
2238               
2239                /* answer keep alive */
2240                if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive )
2241                {
2242            time_t now = time(NULL);
2243            logging_debug("[BaseOverlay] Answering KeepAlive over "
2244                    << ld->to_string()
2245                    << " after "
2246                    << difftime( now, ld->keepAliveSent )
2247                    << "s");
2248           
2249            OverlayMsg msg( OverlayMsg::typeKeepAliveReply,
2250                    OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
2251            msg.setRouteRecord(true);
2252            ld->keepAliveSent = now;
2253            send_link( &msg, ld->overlayId, system_priority::OVERLAY );
2254                }
2255
2256                return true;
2257        }
2258        else
2259        {
2260                logging_error("No Keep-Alive for "
2261                                << overlayMsg->getDestinationLink() << ": link unknown." );
2262                return false;
2263        }
2264}
2265
2266/// handle a direct link message
2267bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2268        logging_debug( "Received direct link replacement request" );
2269
2270        /// get destination overlay link
2271        LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
2272        if (rld == NULL || ld == NULL) {
2273                logging_error("Direct link replacement: Link "
2274                                << overlayMsg->getDestinationLink() << "not found error." );
2275                return false;
2276        }
2277        logging_info( "Received direct link convert notification for " << rld );
2278
2279        // update information
2280        rld->communicationId = ld->communicationId;
2281        rld->communicationUp = true;
2282        rld->relayed = false;
2283
2284        // mark used and alive!
2285        rld->setAlive();
2286        rld->setAutoUsed();
2287
2288        // erase the original descriptor
2289        eraseDescriptor(ld->overlayId);
2290       
2291    // inform listener
2292    if( rld->listener != NULL)
2293        rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode );
2294       
2295        return true;
2296}
2297
2298/// handles an incoming message
2299bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld,
2300                const LinkID bcLink )
2301{
2302        // decapsulate overlay message
2303        OverlayMsg* overlayMsg = new OverlayMsg();
2304        reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message);
2305
2306//      // XXX debug
2307//      logging_info( "Received overlay message."
2308//              << " Hops: " << (int) overlayMsg->getNumHops()
2309//              << " Type: " << (int) overlayMsg->getType()
2310//              << " Payload size: " << sub_buff.size()
2311//             << " SeqNum: " << overlayMsg->getSeqNum() );
2312       
2313       
2314        // increase number of hops
2315        overlayMsg->increaseNumHops();
2316
2317        // refresh relay information
2318        refreshRelayInformation( overlayMsg, ld );
2319
2320        // update route record
2321        overlayMsg->addRouteRecord(nodeId);
2322
2323        // handle signaling messages (do not route!)
2324        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
2325                        overlayMsg->getType()<=OverlayMsg::typeSignalingEnd )
2326        {
2327                overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
2328                delete overlayMsg;
2329                return true;
2330        }
2331
2332        // message for reached destination? no-> route message
2333        if (!overlayMsg->getDestinationNode().isUnspecified() &&
2334                        overlayMsg->getDestinationNode() != nodeId ) {
2335                logging_debug("Routing message "
2336                                << " from " << overlayMsg->getSourceNode()
2337                                << " to " << overlayMsg->getDestinationNode()
2338                );
2339               
2340//              // XXX testing AKTUELL
2341//        logging_info("MARIO: Routing message "
2342//                << " from " << overlayMsg->getSourceNode()
2343//                << " to " << overlayMsg->getDestinationNode() );
2344//        logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size());
2345                overlayMsg->append_buffer(sub_buff);
2346               
2347                route( overlayMsg, ld->remoteNode );
2348                delete overlayMsg;
2349                return true;
2350        }
2351
2352       
2353        /* handle base overlay message */
2354        bool ret = false; // return value
2355        try
2356        {
2357        switch ( overlayMsg->getType() ) 
2358        {
2359            // data transport messages
2360            case OverlayMsg::typeData:
2361            {
2362                // NOTE: On relayed links, »ld« does not point to our link, but on the relay link.
2363                LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink());
2364               
2365                if ( ! end_to_end_ld )
2366                {
2367                    logging_warn("Error: Data-Message claims to belong to a link we don't know.");
2368                   
2369                    ret = false;
2370                }
2371                else
2372                {
2373                    // message received --> link is alive
2374                    end_to_end_ld->keepAliveReceived = time(NULL);
2375                    // hop count on this link
2376                    end_to_end_ld->hops = overlayMsg->getNumHops();
2377                   
2378                    // * call handler *
2379                    ret = handleData(sub_buff, overlayMsg, end_to_end_ld);
2380                }
2381               
2382                break;
2383            }
2384            case OverlayMsg::typeMessageLost:
2385                ret = handleLostMessage(sub_buff, overlayMsg);
2386               
2387                break;
2388       
2389                // overlay setup messages
2390            case OverlayMsg::typeJoinRequest:
2391                ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink );        break;
2392            case OverlayMsg::typeJoinReply:
2393                ret = handleJoinReply(sub_buff, bcLink );       break;
2394       
2395                // link specific messages
2396            case OverlayMsg::typeLinkRequest:
2397                ret = handleLinkRequest(overlayMsg, ld );       break;
2398            case OverlayMsg::typeLinkReply:
2399                ret = handleLinkReply(overlayMsg, sub_buff, ld );       break;
2400            case OverlayMsg::typeLinkUpdate:
2401                ret = handleLinkUpdate(overlayMsg, ld );        break;
2402            case OverlayMsg::typeKeepAlive:
2403            case OverlayMsg::typeKeepAliveReply:
2404                ret = handleLinkAlive(overlayMsg, ld );         break;
2405            case OverlayMsg::typeLinkDirect:
2406                ret = handleLinkDirect(overlayMsg, ld );        break;
2407               
2408            case OverlayMsg::typeLinkClose:
2409            {
2410                dropLink(overlayMsg->getDestinationLink());
2411                __removeDroppedLink(overlayMsg->getDestinationLink());
2412               
2413                break;
2414            }
2415           
2416            /// ping over overlay path (or similar)
2417            case OverlayMsg::typePing:
2418            {
2419                ret = handlePing(overlayMsg, ld);
2420                break;
2421            }
2422            case OverlayMsg::typePong:
2423            {
2424                ret = handlePong(overlayMsg, ld);
2425                break;
2426            }
2427           
2428                // handle unknown message type
2429            default:
2430            {
2431                logging_error( "received message in invalid state! don't know " <<
2432                        "what to do with this message of type " << overlayMsg->getType() );
2433                ret = false;
2434                break;
2435            }
2436        }
2437        }
2438        catch ( reboost::illegal_sub_buffer& e )
2439        {
2440            logging_error( "Failed to create sub-buffer while reading message: »"
2441                    << e.what()
2442                    << "« Message too short? ");
2443           
2444            assert(false); // XXX
2445        }
2446
2447        // free overlay message and return value
2448        delete overlayMsg;
2449        return ret;
2450}
2451
2452// ----------------------------------------------------------------------------
2453
2454void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) {
2455
2456        logging_debug( "broadcasting message to all known nodes " <<
2457                        "in the overlay from service " + service.toString() );
2458
2459        OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
2460        for(size_t i=0; i<nodes.size(); i++){
2461                NodeID& id = nodes.at(i);
2462                if(id == this->nodeId) continue; // don't send to ourselfs
2463
2464                sendMessage( message, id, priority, service );
2465        }
2466}
2467
2468/// return the overlay neighbors
2469vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
2470        // the known nodes _can_ also include our node, so we remove ourself
2471        vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
2472        vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
2473        if( i != nodes.end() ) nodes.erase( i );
2474        return nodes;
2475}
2476
2477const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
2478        if( lid == LinkID::UNSPECIFIED ) return nodeId;
2479        const LinkDescriptor* ld = getDescriptor(lid);
2480        if( ld == NULL ) return NodeID::UNSPECIFIED;
2481        else return ld->remoteNode;
2482}
2483
2484vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
2485        vector<LinkID> linkvector;
2486        foreach( LinkDescriptor* ld, links ) {
2487                if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
2488                        linkvector.push_back( ld->overlayId );
2489                }
2490        }
2491        return linkvector;
2492}
2493
2494
2495void BaseOverlay::onNodeJoin(const NodeID& node) {
2496        JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
2497        if( i == joiningNodes.end() ) return;
2498
2499        logging_info( "node has successfully joined baseoverlay and overlay structure "
2500                        << node.toString() );
2501
2502        joiningNodes.erase( i );
2503}
2504
2505void BaseOverlay::eventFunction() {
2506        stabilizeRelays();
2507        stabilizeLinks();
2508        updateVisual();
2509}
2510
2511
2512
2513/* link status */
2514bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const
2515{
2516    const LinkDescriptor* ld = getDescriptor(lnk);
2517   
2518    if (!ld)
2519        return false;
2520   
2521    return ld->communicationUp && !ld->relayed;
2522}
2523
2524int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const
2525{
2526    const LinkDescriptor* ld = getDescriptor(lnk);
2527   
2528    if (!ld)
2529        return -1;
2530   
2531    return ld->hops;   
2532}
2533
2534
2535bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const
2536{
2537    time_t now = time(NULL);
2538
2539    return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..?
2540}
2541
2542bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const
2543{
2544    return isLinkVital(link) && link->communicationUp && !link->relayed;
2545}
2546
2547/* [link status] */
2548
2549
2550void BaseOverlay::updateVisual(){
2551
2552        //
2553        // update base overlay structure
2554        //
2555
2556        static NodeID pre = NodeID::UNSPECIFIED;
2557        static NodeID suc = NodeID::UNSPECIFIED;
2558
2559        vector<NodeID> nodes = this->getOverlayNeighbors(false);
2560
2561        if(nodes.size() == 0){
2562
2563                if(pre != NodeID::UNSPECIFIED){
2564                        visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
2565                        pre = NodeID::UNSPECIFIED;
2566                }
2567                if(suc != NodeID::UNSPECIFIED){
2568                        visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
2569                        suc = NodeID::UNSPECIFIED;
2570                }
2571
2572        } // if(nodes.size() == 0)
2573
2574        if(nodes.size() == 1){
2575                // only one node, make this pre and succ
2576                // and then go into the node.size()==2 case
2577                //nodes.push_back(nodes.at(0));
2578
2579                if(pre != nodes.at(0)){
2580                        pre = nodes.at(0);
2581                        if(pre != NodeID::UNSPECIFIED)
2582                                visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
2583                }
2584        }
2585
2586        if(nodes.size() == 2){
2587
2588                // old finger
2589                if(nodes.at(0) != pre){
2590                        if(pre != NodeID::UNSPECIFIED)
2591                                visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
2592                        pre = NodeID::UNSPECIFIED;
2593                }
2594                if(nodes.at(1) != suc){
2595                        if(suc != NodeID::UNSPECIFIED)
2596                                visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
2597                        suc = NodeID::UNSPECIFIED;
2598                }
2599
2600                // connect with fingers
2601                if(pre == NodeID::UNSPECIFIED){
2602                        pre = nodes.at(0);
2603                        if(pre != NodeID::UNSPECIFIED)
2604                                visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
2605                }
2606                if(suc == NodeID::UNSPECIFIED){
2607                        suc = nodes.at(1);
2608                        if(suc != NodeID::UNSPECIFIED)
2609                                visualInstance.visConnect(visualIdOverlay, this->nodeId, suc, "");
2610                }
2611
2612        } //if(nodes.size() == 2)
2613
2614//      {
2615//              logging_error("================================");
2616//              logging_error("my nodeid " << nodeId.get(MAX_KEYLENGTH-16, 16));
2617//              logging_error("================================");
2618//              if(nodes.size()>= 1){
2619//                      logging_error("real pre " << nodes.at(0).toString());
2620//                      logging_error("real pre " << nodes.at(0).get(MAX_KEYLENGTH-16, 16));
2621//              }
2622//              if(nodes.size()>= 2){
2623//                      logging_error("real suc " << nodes.at(1).toString());
2624//                      logging_error("real suc " << nodes.at(1).get(MAX_KEYLENGTH-16, 16));
2625//              }
2626//              logging_error("================================");
2627//              if(pre == NodeID::UNSPECIFIED){
2628//                      logging_error("pre: unspecified");
2629//              }else{
2630//                      unsigned int prei = pre.get(MAX_KEYLENGTH-16, 16);
2631//                      logging_error("pre: " << prei);
2632//              }
2633//              if(suc == NodeID::UNSPECIFIED){
2634//                      logging_error("suc: unspecified");
2635//              }else{
2636//                      unsigned int suci = suc.get(MAX_KEYLENGTH-16, 16);
2637//                      logging_error("suc: " << suci);
2638//              }
2639//              logging_error("================================");
2640//      }
2641
2642        //
2643        // update base communication links
2644        //
2645
2646        static set<NodeID> linkset;
2647        set<NodeID> remotenodes;
2648        foreach( LinkDescriptor* ld, links ) {
2649                if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
2650                        continue;
2651
2652                if (ld->routeRecord.size()>1 && ld->relayed) {
2653                        for (size_t i=1; i<ld->routeRecord.size(); i++)
2654                                remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] );
2655                } else {
2656                        remotenodes.insert(ld->remoteNode);
2657                }
2658        }
2659
2660        // which links are old and need deletion?
2661        bool changed = false;
2662
2663        do{
2664                changed = false;
2665                foreach(NodeID n, linkset){
2666                        if(remotenodes.find(n) == remotenodes.end()){
2667                                visualInstance.visDisconnect(visualIdBase, this->nodeId, n, "");
2668                                linkset.erase(n);
2669                                changed = true;
2670                                break;
2671                        }
2672                }
2673        }while(changed);
2674
2675        // which links are new and need creation?
2676        do{
2677                changed = false;
2678                foreach(NodeID n, remotenodes){
2679                        if(linkset.find(n) == linkset.end()){
2680                                visualInstance.visConnect(visualIdBase, this->nodeId, n, "");
2681                                linkset.insert(n);
2682                                changed = true;
2683                                break;
2684                        }
2685                }
2686        }while(changed);
2687
2688}
2689
2690// ----------------------------------------------------------------------------
2691
2692std::string BaseOverlay::debugInformation() {
2693        std::stringstream s;
2694        int i=0;
2695
2696        // dump overlay information
2697        s << "Long debug info ... [see below]" << endl << endl;
2698        s << "--- overlay information ----------------------" << endl;
2699        s << overlayInterface->debugInformation() << endl;
2700
2701        // dump link state
2702        s << "--- link state -------------------------------" << endl;
2703        foreach( LinkDescriptor* ld, links ) {
2704                s << "link " << i << ": " << ld << endl;
2705                i++;
2706        }
2707        s << endl << endl;
2708
2709        return s.str();
2710}
2711
2712}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.