An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/overlay/BaseOverlay.cpp @ 6584

Last change on this file since 6584 was 6584, checked in by Christoph Mayer, 14 years ago

visual stuff moved, prepare for 3d

File size: 57.8 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/DHTMessage.h"
54#include "ariba/overlay/messages/JoinRequest.h"
55#include "ariba/overlay/messages/JoinReply.h"
56
57#include "ariba/utility/visual/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62class ValueEntry {
63public:
64        ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),
65                last_change(time(NULL)), value(value.clone()) {
66        }
67
68        ~ValueEntry()  {
69                value.release();
70        }
71
72        void refresh() {
73                last_update = time(NULL);
74        }
75
76        void set_value( const Data& value ) {
77                this->value.release();
78                this->value = value.clone();
79                this->last_change = time(NULL);
80                this->last_update = time(NULL);
81        }
82
83        Data get_value() const {
84                return value;
85        }
86
87        uint16_t get_ttl() const {
88                return ttl;
89        }
90
91        void set_ttl( uint16_t ttl ) {
92                this->ttl = ttl;
93        }
94
95        bool is_ttl_elapsed() const {
96                // is persistent? yes-> always return false
97                if (ttl==0) return false;
98
99                // return true, if ttl is elapsed
100                return ( difftime( time(NULL), this->last_update ) >= ttl );
101        }
102
103private:
104        uint16_t ttl;
105        time_t last_update;
106        time_t last_change;
107        Data value;
108};
109
110class DHTEntry {
111public:
112        Data key;
113        vector<ValueEntry> values;
114
115        vector<Data> get_values() {
116                vector<Data> vect;
117                BOOST_FOREACH( ValueEntry& e, values )
118                        vect.push_back( e.get_value() );
119                return vect;
120        }
121};
122
123class DHT {
124public:
125        typedef vector<DHTEntry> Entries;
126        typedef vector<ValueEntry> Values;
127        Entries entries;
128
129        static bool equals( const Data& lhs, const Data& rhs ) {
130                if (rhs.getLength()!=lhs.getLength()) return false;
131                for (int i=0; i<lhs.getLength()/8; i++)
132                        if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;
133                return true;
134        }
135
136        void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {
137
138                // find entry
139                for (size_t i=0; i<entries.size(); i++) {
140                        DHTEntry& entry = entries.at(i);
141
142                        // check if key is already known
143                        if ( equals(entry.key, key) ) {
144
145                                // check if value is already in values list
146                                for (size_t j=0; j<entry.values.size(); j++) {
147                                        // found value already? yes-> refresh ttl
148                                        if ( equals(entry.values[j].get_value(), value) ) {
149                                                entry.values[j].refresh();
150                                                std::cout << "DHT: Republished value. Refreshing TTL."
151                                                        << std::endl;
152                                                return;
153                                        }
154                                }
155
156                                // new value-> add to entry
157                                std::cout << "DHT: Added value to "
158                                        << " key=" << key << " with value=" << value << std::endl;
159                                entry.values.push_back( ValueEntry( value ) );
160                                entry.values.back().set_ttl(ttl);
161                                return;
162                        }
163                }
164
165                // key is unknown-> add key value pair
166                std::cout << "DHT: New key value pair "
167                        << " key=" << key << " with value=" << value << std::endl;
168
169                // add new entry
170                entries.push_back( DHTEntry() );
171                DHTEntry& entry = entries.back();
172                entry.key = key.clone();
173                entry.values.push_back( ValueEntry(value) );
174                entry.values.back().set_ttl(ttl);
175        }
176
177        vector<Data> get( const Data& key ) {
178                // find entry
179                for (size_t i=0; i<entries.size(); i++) {
180                        DHTEntry& entry = entries.at(i);
181                        if ( equals(entry.key,key) )
182                                return entry.get_values();
183                }
184                return vector<Data>();
185        }
186
187        bool remove( const Data& key ) {
188                // find entry
189                for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
190                        DHTEntry& entry = *i;
191
192                        // found? yes-> delete entry
193                        if ( equals(entry.key, key) ) {
194                                i = entries.erase(i);
195                                return true;
196                        }
197                }
198                return false;
199        }
200
201        bool remove( const Data& key, const Data& value ) {
202                // find entry
203                for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
204                        DHTEntry& entry = *i;
205
206                        // found? yes-> try to find value
207                        if ( equals(entry.key, key) ) {
208                                for (Values::iterator j = entry.values.begin();
209                                                j != entry.values.end(); j++) {
210
211                                        // value found? yes-> delete
212                                        if (equals(j->get_value(), value)) {
213                                                j = entry.values.erase(j);
214                                                return true;
215                                        }
216                                }
217                        }
218                }
219                return false;
220        }
221
222        void cleanup() {
223                // find entry
224                for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
225                        DHTEntry& entry = *i;
226
227                        for (Values::iterator j = entry.values.begin();
228                                        j != entry.values.end(); j++) {
229
230                                // value found? yes-> delete
231                                if (j->is_ttl_elapsed())
232                                        j = entry.values.erase(j);
233                        }
234
235                        if (entry.values.size()==0) i = entries.erase(i);
236                }
237        }
238};
239
240// ----------------------------------------------------------------------------
241
242/* *****************************************************************************
243 * PREREQUESITES
244 * ****************************************************************************/
245
246CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
247        if( !communicationListeners.contains( service ) ) {
248                logging_error( "No listener found for service " << service.toString() );
249                return NULL;
250        }
251        CommunicationListener* listener = communicationListeners.get( service );
252        assert( listener != NULL );
253        return listener;
254}
255
256// link descriptor handling ----------------------------------------------------
257
258LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
259        BOOST_FOREACH( LinkDescriptor* lp, links )
260                if ((communication ? lp->communicationId : lp->overlayId) == link)
261                        return lp;
262        return NULL;
263}
264
265const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
266        BOOST_FOREACH( const LinkDescriptor* lp, links )
267                if ((communication ? lp->communicationId : lp->overlayId) == link)
268                        return lp;
269        return NULL;
270}
271
272/// erases a link descriptor
273void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
274        for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
275                LinkDescriptor* ld = *i;
276                if ((communication ? ld->communicationId : ld->overlayId) == link) {
277                        delete ld;
278                        links.erase(i);
279                        break;
280                }
281        }
282}
283
284/// adds a link descriptor
285LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
286        LinkDescriptor* desc = getDescriptor( link );
287        if ( desc == NULL ) {
288                desc = new LinkDescriptor();
289                if (!link.isUnspecified()) desc->overlayId = link;
290                links.push_back(desc);
291        }
292        return desc;
293}
294
295/// returns a auto-link descriptor
296LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
297        // search for a descriptor that is already up
298        BOOST_FOREACH( LinkDescriptor* lp, links )
299                if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
300                        return lp;
301        // if not found, search for one that is about to come up...
302        BOOST_FOREACH( LinkDescriptor* lp, links )
303                if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
304                        return lp;
305        return NULL;
306}
307
308/// stabilizes link information
309void BaseOverlay::stabilizeLinks() {
310        // send keep-alive messages over established links
311        BOOST_FOREACH( LinkDescriptor* ld, links ) {
312                if (!ld->up) continue;
313                OverlayMsg msg( OverlayMsg::typeLinkAlive,
314                        OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
315                if (ld->relayed) msg.setRouteRecord(true);
316                send_link( &msg, ld->overlayId );
317        }
318
319        // iterate over all links and check for time boundaries
320        vector<LinkDescriptor*> oldlinks;
321        time_t now = time(NULL);
322        BOOST_FOREACH( LinkDescriptor* ld, links ) {
323
324                // keep alives and not up? yes-> link connection request is stale!
325                if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
326
327                        // increase counter
328                        ld->keepAliveMissed++;
329
330                        // missed more than four keep-alive messages (10 sec)? -> drop link
331                        if (ld->keepAliveMissed > 4) {
332                                logging_info( "Link connection request is stale, closing: " << ld );
333                                oldlinks.push_back( ld );
334                                continue;
335                        }
336                }
337
338                if (!ld->up) continue;
339
340                // remote used as relay flag
341                if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
342                        ld->relaying = false;
343
344                // drop links that are dropped and not used as relay
345                if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
346                        oldlinks.push_back( ld );
347                        continue;
348                }
349
350                // auto-link time exceeded?
351                if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
352                        oldlinks.push_back( ld );
353                        continue;
354                }
355
356                // keep alives missed? yes->
357                if ( difftime( now, ld->keepAliveTime ) > 2 ) {
358
359                        // increase counter
360                        ld->keepAliveMissed++;
361
362                        // missed more than four keep-alive messages (4 sec)? -> drop link
363                        if (ld->keepAliveMissed >= 4) {
364                                logging_info( "Link is stale, closing: " << ld );
365                                oldlinks.push_back( ld );
366                                continue;
367                        }
368                }
369        }
370
371        // drop links
372        BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
373                logging_info( "Link timed out. Dropping " << ld );
374                ld->relaying = false;
375                dropLink( ld->overlayId );
376        }
377
378        // show link state
379        counter++;
380        if (counter>=4) showLinks();
381        if (counter>=4 || counter<0) counter = 0;
382}
383
384
385std::string BaseOverlay::getLinkHTMLInfo() {
386        std::ostringstream s;
387        vector<NodeID> nodes;
388        if (links.size()==0) {
389                s << "<h2 style=\"color=#606060\">No links established!</h2>";
390        } else {
391                s << "<h2 style=\"color=#606060\">Links</h2>";
392                s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
393                s << "<tr style=\"background-color=#ffe0e0\">";
394                s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
395                s << "</tr>";
396
397                int i=0;
398                BOOST_FOREACH( LinkDescriptor* ld, links ) {
399                        if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
400                        bool found = false;
401                        BOOST_FOREACH(NodeID& id, nodes)
402                                if (id  == ld->remoteNode) found = true;
403                        if (found) continue;
404                        i++;
405                        nodes.push_back(ld->remoteNode);
406                        if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
407                                else s << "<tr>";
408                        s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
409                        s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
410                        s << "<td>";
411                        if (ld->routeRecord.size()>1 && ld->relayed) {
412                                for (size_t i=1; i<ld->routeRecord.size(); i++)
413                                        s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
414                        } else {
415                                s << "Direct";
416                        }
417                        s << "</td>";
418                        s << "</tr>";
419                }
420                s << "</table>";
421        }
422        return s.str();
423}
424
425/// shows the current link state
426void BaseOverlay::showLinks() {
427        int i=0;
428        logging_info("--- link state -------------------------------");
429        BOOST_FOREACH( LinkDescriptor* ld, links ) {
430                logging_info("link " << i << ": " << ld);
431                i++;
432        }
433        logging_info("----------------------------------------------");
434}
435
436/// compares two arbitrary links to the same node
437int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
438        LinkDescriptor* lhsld = getDescriptor(lhs);
439        LinkDescriptor* rhsld = getDescriptor(rhs);
440        if (lhsld==NULL || rhsld==NULL
441                || !lhsld->up || !rhsld->up
442                || lhsld->remoteNode != rhsld->remoteNode) return -1;
443
444        if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId)  )
445                return -1;
446
447        return 1;
448}
449
450
451// internal message delivery ---------------------------------------------------
452
453/// routes a message to its destination node
454void BaseOverlay::route( OverlayMsg* message ) {
455
456        // exceeded time-to-live? yes-> drop message
457        if (message->getNumHops() > message->getTimeToLive()) {
458                logging_warn("Message exceeded TTL. Dropping message and relay routes"
459                                "for recovery.");
460                removeRelayNode(message->getDestinationNode());
461                return;
462        }
463
464        // no-> forward message
465        else {
466                // destinastion myself? yes-> handle message
467                if (message->getDestinationNode() == nodeId) {
468                        logging_warn("Usually I should not route messages to myself!");
469                        Message msg;
470                        msg.encapsulate(message);
471                        handleMessage( &msg, NULL );
472                } else {
473                        // no->send message to next hop
474                        send( message, message->getDestinationNode() );
475                }
476        }
477}
478
479/// sends a message to another node, delivers it to the base overlay class
480seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
481        LinkDescriptor* next_link = NULL;
482
483        // drop messages to unspecified destinations
484        if (destination.isUnspecified()) return -1;
485
486        // send messages to myself -> handle message and drop warning!
487        if (destination == nodeId) {
488                logging_warn("Sent message to myself. Handling message.")
489                Message msg;
490                msg.encapsulate(message);
491                handleMessage( &msg, NULL );
492                return -1;
493        }
494
495        // use relay path?
496        if (message->isRelayed()) {
497                next_link = getRelayLinkTo( destination );
498                if (next_link != NULL) {
499                        next_link->setRelaying();
500                        return bc->sendMessage(next_link->communicationId, message);
501                } else {
502                        logging_warn("Could not send message. No relay hop found to "
503                                        << destination)
504                        return -1;
505                }
506        }
507
508        // routed message
509        else {
510                // no-> relay path! route over overlay path
511                LinkID next_id = overlayInterface->getNextLinkId( destination );
512                if (next_id.isUnspecified()) {
513                        logging_warn("Could not send message. No next hop found to " <<
514                                destination );
515                        return -1;
516                }
517
518                // get link descriptor, up and running? yes-> send message
519                next_link = getDescriptor(next_id);
520                if (next_link != NULL && next_link->up) {
521                        // send message over relayed link
522                        return send(message, next_link);
523                }
524
525                // no-> error, dropping message
526                else {
527                        logging_warn("Could not send message. Link not known or up");
528                        return -1;
529                }
530        }
531
532        // not reached-> fail
533        return -1;
534}
535
536/// send a message using a link descriptor, delivers it to the base overlay class
537seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
538        // check if null
539        if (ldr == NULL) {
540                logging_error("Can not send message to " << message->getDestinationAddress());
541                return -1;
542        }
543
544        // check if up
545        if (!ldr->up && !ignore_down) {
546                logging_error("Can not send message. Link not up:" << ldr );
547                return -1;
548        }
549        LinkDescriptor* ld = NULL;
550
551        // handle relayed link
552        if (ldr->relayed) {
553                logging_debug("Resolving direct link for relayed link to "
554                        << ldr->remoteNode);
555                ld = getRelayLinkTo( ldr->remoteNode );
556                if (ld==NULL) {
557                        logging_error("No relay path found to link " << ldr );
558                        return -1;
559                }
560                ld->setRelaying();
561                message->setRelayed(true);
562        } else
563                ld = ldr;
564
565        // handle direct link
566        if (ld->communicationUp) {
567                logging_debug("send(): Sending message over direct link.");
568                return bc->sendMessage( ld->communicationId, message );
569        } else {
570                logging_error("send(): Could not send message. "
571                                "Not a relayed link and direct link is not up.");
572                return -1;
573        }
574        return -1;
575}
576
577seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
578        const ServiceID& service) {
579        message->setSourceNode(nodeId);
580        message->setDestinationNode(remote);
581        message->setService(service);
582        send( message, remote );
583}
584
585seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
586        LinkDescriptor* ld = getDescriptor(link);
587        if (ld==NULL) {
588                logging_error("Cannot find descriptor to link id=" << link.toString());
589                return -1;
590        }
591        message->setSourceNode(nodeId);
592        message->setDestinationNode(ld->remoteNode);
593
594        message->setSourceLink(ld->overlayId);
595        message->setDestinationLink(ld->remoteLink);
596
597        message->setService(ld->service);
598        message->setRelayed(ld->relayed);
599        return send( message, ld, ignore_down );
600}
601
602// relay route management ------------------------------------------------------
603
604/// stabilize relay information
605void BaseOverlay::stabilizeRelays() {
606        vector<relay_route>::iterator i = relay_routes.begin();
607        while (i!=relay_routes.end() ) {
608                relay_route& route = *i;
609                LinkDescriptor* ld = getDescriptor(route.link);
610
611                // relay link still used and alive?
612                if (ld==NULL
613                        || !ld->isDirectVital()
614                        || difftime(route.used, time(NULL)) > 8) {
615                        logging_info("Forgetting relay information to node "
616                                        << route.node.toString() );
617                        i = relay_routes.erase(i);
618                } else
619                        i++;
620        }
621}
622
623void BaseOverlay::removeRelayLink( const LinkID& link ) {
624        vector<relay_route>::iterator i = relay_routes.begin();
625        while (i!=relay_routes.end() ) {
626                relay_route& route = *i;
627                if (route.link == link ) i = relay_routes.erase(i); else i++;
628        }
629}
630
631void BaseOverlay::removeRelayNode( const NodeID& remote ) {
632        vector<relay_route>::iterator i = relay_routes.begin();
633        while (i!=relay_routes.end() ) {
634                relay_route& route = *i;
635                if (route.node == remote ) i = relay_routes.erase(i); else i++;
636        }
637}
638
639/// refreshes relay information
640void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
641
642        // handle relayed messages from real links only
643        if (ld == NULL
644                || ld->relayed
645                || message->getSourceNode()==nodeId ) return;
646
647        // update usage information
648        if (message->isRelayed()) {
649                // try to find source node
650                BOOST_FOREACH( relay_route& route, relay_routes ) {
651                        // relay route found? yes->
652                        if ( route.node == message->getDestinationNode() ) {
653                                ld->setRelaying();
654                                route.used = time(NULL);
655                        }
656                }
657
658        }
659
660        // register relay path
661        if (message->isRegisterRelay()) {
662                // set relaying
663                ld->setRelaying();
664
665                // try to find source node
666                BOOST_FOREACH( relay_route& route, relay_routes ) {
667
668                        // relay route found? yes->
669                        if ( route.node == message->getSourceNode() ) {
670
671                                // refresh timer
672                                route.used = time(NULL);
673                                LinkDescriptor* rld = getDescriptor(route.link);
674
675                                // route has a shorter hop count or old link is dead? yes-> replace
676                                if (route.hops > message->getNumHops()
677                                        || rld == NULL
678                                        || !rld->isDirectVital()) {
679                                        logging_info("Updating relay information to node "
680                                                << route.node.toString()
681                                                << " reducing to " << message->getNumHops() << " hops.");
682                                        route.hops = message->getNumHops();
683                                        route.link = ld->overlayId;
684                                }
685                                return;
686                        }
687                }
688
689                // not found-> add new entry
690                relay_route route;
691                route.hops = message->getNumHops();
692                route.link = ld->overlayId;
693                route.node = message->getSourceNode();
694                route.used = time(NULL);
695                logging_info("Remembering relay information to node "
696                        << route.node.toString());
697                relay_routes.push_back(route);
698        }
699}
700
701/// returns a known "vital" relay link which is up and running
702LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
703        // try to find source node
704        BOOST_FOREACH( relay_route& route, relay_routes ) {
705                if (route.node == remote ) {
706                        LinkDescriptor* ld = getDescriptor( route.link );
707                        if (ld==NULL || !ld->isDirectVital()) return NULL; else {
708                                route.used = time(NULL);
709                                return ld;
710                        }
711                }
712        }
713        return NULL;
714}
715
716/* *****************************************************************************
717 * PUBLIC MEMBERS
718 * ****************************************************************************/
719
720use_logging_cpp(BaseOverlay);
721
722// ----------------------------------------------------------------------------
723
724BaseOverlay::BaseOverlay() :
725        bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
726        spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
727        sideport(&SideportListener::DEFAULT), started(false), counter(0) {
728        dht = new DHT();
729}
730
731BaseOverlay::~BaseOverlay() {
732        delete dht;
733}
734
735// ----------------------------------------------------------------------------
736
737void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
738        logging_info("Starting...");
739
740        // set parameters
741        bc = &_basecomm;
742        nodeId = _nodeid;
743
744        // register at base communication
745        bc->registerMessageReceiver( this );
746        bc->registerEventListener( this );
747
748        // timer for auto link management
749        Timer::setInterval( 1000 );
750        Timer::start();
751
752        started = true;
753        state = BaseOverlayStateInvalid;
754}
755
756void BaseOverlay::stop() {
757        logging_info("Stopping...");
758
759        // stop timer
760        Timer::stop();
761
762        // delete oberlay interface
763        if(overlayInterface != NULL) {
764                delete overlayInterface;
765                overlayInterface = NULL;
766        }
767
768        // unregister at base communication
769        bc->unregisterMessageReceiver( this );
770        bc->unregisterEventListener( this );
771
772        started = false;
773        state = BaseOverlayStateInvalid;
774}
775
776bool BaseOverlay::isStarted(){
777        return started;
778}
779
780// ----------------------------------------------------------------------------
781
782void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
783                const EndpointDescriptor& bootstrapEp) {
784
785        if(id != spovnetId){
786                logging_error("attempt to join against invalid spovnet, call initiate first");
787                return;
788        }
789
790
791        //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
792        logging_info( "Starting to join spovnet " << id.toString() <<
793                        " with nodeid " << nodeId.toString());
794
795        if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
796
797                // bootstrap against ourselfs
798                logging_debug("joining spovnet locally");
799
800                overlayInterface->joinOverlay();
801                state = BaseOverlayStateCompleted;
802                BOOST_FOREACH( NodeListener* i, nodeListeners )
803                        i->onJoinCompleted( spovnetId );
804
805                //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
806                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
807
808                logging_debug("starting overlay bootstrap module");
809                overlayBootstrap.start(this, spovnetId, nodeId);
810                overlayBootstrap.publish(bc->getEndpointDescriptor());
811
812        } else {
813
814                // bootstrap against another node
815                logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
816
817                const LinkID& lnk = bc->establishLink( bootstrapEp );
818                bootstrapLinks.push_back(lnk);
819                logging_info("join process initiated for " << id.toString() << "...");
820        }
821}
822
823void BaseOverlay::leaveSpoVNet() {
824
825        logging_info( "Leaving spovnet " << spovnetId );
826        bool ret = ( state != this->BaseOverlayStateInvalid );
827
828        logging_debug("stopping overlay bootstrap module");
829        overlayBootstrap.stop();
830        overlayBootstrap.revoke();
831
832        logging_debug( "Dropping all auto-links" );
833
834        // gather all service links
835        vector<LinkID> servicelinks;
836        BOOST_FOREACH( LinkDescriptor* ld, links ) {
837                if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
838                servicelinks.push_back( ld->overlayId );
839        }
840
841        // drop all service links
842        BOOST_FOREACH( LinkID lnk, servicelinks )
843                dropLink( lnk );
844
845        // let the node leave the spovnet overlay interface
846        logging_debug( "Leaving overlay" );
847        if( overlayInterface != NULL )
848                overlayInterface->leaveOverlay();
849
850        // drop still open bootstrap links
851        BOOST_FOREACH( LinkID lnk, bootstrapLinks )
852                bc->dropLink( lnk );
853
854        // change to inalid state
855        state = BaseOverlayStateInvalid;
856        //ovl.visShutdown( ovlId, nodeId, string("") );
857
858        // inform all registered services of the event
859        BOOST_FOREACH( NodeListener* i, nodeListeners ) {
860                if( ret ) i->onLeaveCompleted( spovnetId );
861                else i->onLeaveFailed( spovnetId );
862        }
863}
864
865void BaseOverlay::createSpoVNet(const SpoVNetID& id,
866                const OverlayParameterSet& param,
867                const SecurityParameterSet& sec,
868                const QoSParameterSet& qos) {
869
870        // set the state that we are an initiator, this way incoming messages are
871        // handled correctly
872        logging_info( "creating spovnet " + id.toString() <<
873                        " with nodeid " << nodeId.toString() );
874
875        spovnetId = id;
876
877        overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
878        if( overlayInterface == NULL ) {
879                logging_fatal( "overlay structure not supported" );
880                state = BaseOverlayStateInvalid;
881
882                BOOST_FOREACH( NodeListener* i, nodeListeners )
883                        i->onJoinFailed( spovnetId );
884
885                return;
886        }
887}
888
889// ----------------------------------------------------------------------------
890
891const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
892        const NodeID& remoteId, const ServiceID& service ) {
893
894        // establish link via overlay
895        if (!remoteId.isUnspecified())
896                return establishLink( remoteId, service );
897        else
898
899        // establish link directly if only ep is known
900        if (remoteId.isUnspecified())
901                return establishDirectLink(remoteEp, service );
902
903}
904
905/// call base communication's establish link and add link mapping
906const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
907        const ServiceID& service ) {
908
909        /// find a service listener
910        if( !communicationListeners.contains( service ) ) {
911                logging_error( "No listener registered for service id=" << service.toString() );
912                return LinkID::UNSPECIFIED;
913        }
914        CommunicationListener* listener = communicationListeners.get( service );
915        assert( listener != NULL );
916
917        // create descriptor
918        LinkDescriptor* ld = addDescriptor();
919        ld->relayed = false;
920        ld->listener = listener;
921        ld->service = service;
922        ld->communicationId = bc->establishLink( ep );
923
924        /// establish link and add mapping
925        logging_info("Establishing direct link " << ld->communicationId.toString()
926                << " using " << ep.toString());
927
928        return ld->communicationId;
929}
930
931/// establishes a link between two arbitrary nodes
932const LinkID BaseOverlay::establishLink( const NodeID& remote,
933        const ServiceID& service ) {
934
935        // do not establish a link to myself!
936        if (remote == nodeId) return LinkID::UNSPECIFIED;
937
938        // create a link descriptor
939        LinkDescriptor* ld = addDescriptor();
940        ld->relayed = true;
941        ld->remoteNode = remote;
942        ld->service = service;
943        ld->listener = getListener(ld->service);
944
945        // create link request message
946        OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
947        msg.setSourceLink(ld->overlayId);
948        msg.setRelayed(true);
949
950        // debug message
951        logging_info(
952                "Sending link request with"
953                << " link=" << ld->overlayId.toString()
954                << " node=" << ld->remoteNode.toString()
955                << " serv=" << ld->service.toString()
956        );
957
958        // sending message to node
959        send_node( &msg, ld->remoteNode, ld->service );
960
961        return ld->overlayId;
962}
963
964/// drops an established link
965void BaseOverlay::dropLink(const LinkID& link) {
966        logging_info( "Dropping link (initiated locally):" << link.toString() );
967
968        // find the link item to drop
969        LinkDescriptor* ld = getDescriptor(link);
970        if( ld == NULL ) {
971                logging_warn( "Can't drop link, link is unknown!");
972                return;
973        }
974
975        // delete all queued messages
976        if( ld->messageQueue.size() > 0 ) {
977                logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
978                                << ld->messageQueue.size() << " waiting messages" );
979                ld->flushQueue();
980        }
981
982        // inform sideport and listener
983        ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
984        sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
985
986        // do not drop relay links
987        if (!ld->relaying) {
988                // drop the link in base communication
989                if (ld->communicationUp) bc->dropLink( ld->communicationId );
990
991                // erase descriptor
992                eraseDescriptor( ld->overlayId );
993        } else {
994                ld->dropAfterRelaying = true;
995        }
996}
997
998// ----------------------------------------------------------------------------
999
1000/// internal send message, always use this functions to send messages over links
1001seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
1002        logging_debug( "Sending data message on link " << link.toString() );
1003
1004        // get the mapping for this link
1005        LinkDescriptor* ld = getDescriptor(link);
1006        if( ld == NULL ) {
1007                logging_error("Could not send message. "
1008                        << "Link not found id=" << link.toString());
1009                return -1;
1010        }
1011
1012        // check if the link is up yet, if its an auto link queue message
1013        if( !ld->up ) {
1014                ld->setAutoUsed();
1015                if( ld->autolink ) {
1016                        logging_info("Auto-link " << link.toString() << " not up, queue message");
1017                        Data data = data_serialize( message );
1018                        const_cast<Message*>(message)->dropPayload();
1019                        ld->messageQueue.push_back( new Message(data) );
1020                } else {
1021                        logging_error("Link " << link.toString() << " not up, drop message");
1022                }
1023                return -1;
1024        }
1025
1026        // compile overlay message (has service and node id)
1027        OverlayMsg overmsg( OverlayMsg::typeData );
1028        overmsg.encapsulate( const_cast<Message*>(message) );
1029
1030        // send message over relay/direct/overlay
1031        return send_link( &overmsg, ld->overlayId );
1032}
1033
1034seqnum_t BaseOverlay::sendMessage(const Message* message,
1035        const NodeID& node, const ServiceID& service) {
1036
1037        // find link for node and service
1038        LinkDescriptor* ld = getAutoDescriptor( node, service );
1039
1040        // if we found no link, create an auto link
1041        if( ld == NULL ) {
1042
1043                // debug output
1044                logging_info( "No link to send message to node "
1045                        << node.toString() << " found for service "
1046                        << service.toString() << ". Creating auto link ..."
1047                );
1048
1049                // call base overlay to create a link
1050                LinkID link = establishLink( node, service );
1051                ld = getDescriptor( link );
1052                if( ld == NULL ) {
1053                        logging_error( "Failed to establish auto-link.");
1054                        return -1;
1055                }
1056                ld->autolink = true;
1057
1058                logging_debug( "Auto-link establishment in progress to node "
1059                                << node.toString() << " with link id=" << link.toString() );
1060        }
1061        assert(ld != NULL);
1062
1063        // mark the link as used, as we now send a message through it
1064        ld->setAutoUsed();
1065
1066        // send / queue message
1067        return sendMessage( message, ld->overlayId );
1068}
1069
1070// ----------------------------------------------------------------------------
1071
1072const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1073                const LinkID& link) const {
1074
1075        // return own end-point descriptor
1076        if( link == LinkID::UNSPECIFIED )
1077                return bc->getEndpointDescriptor();
1078
1079        // find link descriptor. not found -> return unspecified
1080        const LinkDescriptor* ld = getDescriptor(link);
1081        if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
1082
1083        // return endpoint-descriptor from base communication
1084        return bc->getEndpointDescriptor( ld->communicationId );
1085}
1086
1087const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1088                const NodeID& node) const {
1089
1090        // return own end-point descriptor
1091        if( node == nodeId || node == NodeID::UNSPECIFIED )
1092                return bc->getEndpointDescriptor();
1093
1094        // no joined and request remote descriptor? -> fail!
1095        if( overlayInterface == NULL ) {
1096                logging_error( "overlay interface not set, cannot resolve endpoint" );
1097                return EndpointDescriptor::UNSPECIFIED();
1098        }
1099
1100        // resolve end-point descriptor from the base-overlay routing table
1101        const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
1102        if(ep != EndpointDescriptor::UNSPECIFIED()) return ep;
1103
1104        // see if we can find the node in our own table
1105        BOOST_FOREACH(const LinkDescriptor* ld, links){
1106                if(ld->remoteNode != node) continue;
1107                const EndpointDescriptor& ep = bc->getEndpointDescriptor(ld->communicationId);
1108                if(ep.toString().size()==0) continue;
1109                if(ep != EndpointDescriptor::UNSPECIFIED()) return ep;
1110        }
1111
1112        return EndpointDescriptor::UNSPECIFIED();
1113}
1114
1115// ----------------------------------------------------------------------------
1116
1117bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1118        sideport = _sideport;
1119        _sideport->configure( this );
1120}
1121
1122bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1123        sideport = &SideportListener::DEFAULT;
1124}
1125
1126// ----------------------------------------------------------------------------
1127
1128bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1129        logging_debug( "binding communication listener " << listener
1130                        << " on serviceid " << sid.toString() );
1131
1132        if( communicationListeners.contains( sid ) ) {
1133                logging_error( "some listener already registered for service id "
1134                                << sid.toString() );
1135                return false;
1136        }
1137
1138        communicationListeners.registerItem( listener, sid );
1139        return true;
1140}
1141
1142
1143bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1144        logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1145
1146        if( !communicationListeners.contains( sid ) ) {
1147                logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1148                return false;
1149        }
1150
1151        if( communicationListeners.get(sid) != listener ) {
1152                logging_warn( "listener bound to service id " << sid.toString()
1153                                << " is different than listener trying to unbind" );
1154                return false;
1155        }
1156
1157        communicationListeners.unregisterItem( sid );
1158        return true;
1159}
1160
1161// ----------------------------------------------------------------------------
1162
1163bool BaseOverlay::bind(NodeListener* listener) {
1164        logging_debug( "Binding node listener " << listener );
1165
1166        // already bound? yes-> warning
1167        NodeListenerVector::iterator i =
1168                find( nodeListeners.begin(), nodeListeners.end(), listener );
1169        if( i != nodeListeners.end() ) {
1170                logging_warn("Node listener " << listener << " is already bound!" );
1171                return false;
1172        }
1173
1174        // no-> add
1175        nodeListeners.push_back( listener );
1176        return true;
1177}
1178
1179bool BaseOverlay::unbind(NodeListener* listener) {
1180        logging_debug( "Unbinding node listener " << listener );
1181
1182        // already unbound? yes-> warning
1183        NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1184        if( i == nodeListeners.end() ) {
1185                logging_warn( "Node listener " << listener << " is not bound!" );
1186                return false;
1187        }
1188
1189        // no-> remove
1190        nodeListeners.erase( i );
1191        return true;
1192}
1193
1194// ----------------------------------------------------------------------------
1195
1196void BaseOverlay::onLinkUp(const LinkID& id,
1197        const address_v* local, const address_v* remote) {
1198        logging_debug( "Link up with base communication link id=" << id );
1199
1200        // get descriptor for link
1201        LinkDescriptor* ld = getDescriptor(id, true);
1202
1203        // handle bootstrap link we initiated
1204        if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1205                logging_info(
1206                        "Join has been initiated by me and the link is now up. " <<
1207                        "Sending out join request for SpoVNet " << spovnetId.toString()
1208                );
1209
1210                // send join request message
1211                OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1212                        OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1213                JoinRequest joinRequest( spovnetId, nodeId );
1214                overlayMsg.encapsulate( &joinRequest );
1215                bc->sendMessage( id, &overlayMsg );
1216                return;
1217        }
1218
1219        // no link found? -> link establishment from remote, add one!
1220        if (ld == NULL) {
1221                ld = addDescriptor( id );
1222                logging_info( "onLinkUp (remote request) descriptor: " << ld );
1223
1224                // update descriptor
1225                ld->fromRemote = true;
1226                ld->communicationId = id;
1227                ld->communicationUp = true;
1228                ld->setAutoUsed();
1229                ld->setAlive();
1230
1231                // in this case, do not inform listener, since service it unknown
1232                // -> wait for update message!
1233
1234        // link mapping found? -> send update message with node-id and service id
1235        } else {
1236                logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1237
1238                // update descriptor
1239                ld->setAutoUsed();
1240                ld->setAlive();
1241                ld->communicationUp = true;
1242                ld->fromRemote = false;
1243
1244                // if link is a relayed link->convert to direct link
1245                if (ld->relayed) {
1246                        logging_info( "Converting to direct link: " << ld );
1247                        ld->up = true;
1248                        ld->relayed = false;
1249                        OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1250                        overMsg.setSourceLink( ld->overlayId );
1251                        overMsg.setDestinationLink( ld->remoteLink );
1252                        send_link( &overMsg, ld->overlayId );
1253                } else {
1254                        // note: necessary to validate the link on the remote side!
1255                        logging_info( "Sending out update" <<
1256                                " for service " << ld->service.toString() <<
1257                                " with local node id " << nodeId.toString() <<
1258                                " on link " << ld->overlayId.toString() );
1259
1260                        // compile and send update message
1261                        OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1262                        overlayMsg.setSourceLink(ld->overlayId);
1263                        overlayMsg.setAutoLink( ld->autolink );
1264                        send_link( &overlayMsg, ld->overlayId, true );
1265                }
1266        }
1267}
1268
1269void BaseOverlay::onLinkDown(const LinkID& id,
1270        const address_v* local, const address_v* remote) {
1271
1272        // erase bootstrap links
1273        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1274        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1275
1276        // get descriptor for link
1277        LinkDescriptor* ld = getDescriptor(id, true);
1278        if ( ld == NULL ) return; // not found? ->ignore!
1279        logging_info( "onLinkDown descriptor: " << ld );
1280
1281        // removing relay link information
1282        removeRelayLink(ld->overlayId);
1283
1284        // inform listeners about link down
1285        ld->communicationUp = false;
1286        if (!ld->service.isUnspecified()) {
1287                getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode );
1288                sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1289        }
1290
1291        // delete all queued messages (auto links)
1292        if( ld->messageQueue.size() > 0 ) {
1293                logging_warn( "Dropping link " << id.toString() << " that has "
1294                                << ld->messageQueue.size() << " waiting messages" );
1295                ld->flushQueue();
1296        }
1297
1298        // erase mapping
1299        eraseDescriptor(ld->overlayId);
1300}
1301
1302void BaseOverlay::onLinkChanged(const LinkID& id,
1303        const address_v* oldlocal, const address_v* newlocal,
1304        const address_v* oldremote, const address_v* newremote) {
1305
1306        // get descriptor for link
1307        LinkDescriptor* ld = getDescriptor(id, true);
1308        if ( ld == NULL ) return; // not found? ->ignore!
1309        logging_debug( "onLinkChanged descriptor: " << ld );
1310
1311        // inform listeners
1312        ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1313        sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1314
1315        // autolinks: refresh timestamp
1316        ld->setAutoUsed();
1317}
1318
1319void BaseOverlay::onLinkFail(const LinkID& id,
1320        const address_v* local, const address_v* remote) {
1321        logging_debug( "Link fail with base communication link id=" << id );
1322
1323        // erase bootstrap links
1324        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1325        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1326
1327        // get descriptor for link
1328        LinkDescriptor* ld = getDescriptor(id, true);
1329        if ( ld == NULL ) return; // not found? ->ignore!
1330        logging_debug( "Link failed id=" << ld->overlayId.toString() );
1331
1332        // inform listeners
1333        ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1334        sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1335}
1336
1337void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1338        const address_v* remote, const QoSParameterSet& qos) {
1339        logging_debug( "Link quality changed with base communication link id=" << id );
1340
1341        // get descriptor for link
1342        LinkDescriptor* ld = getDescriptor(id, true);
1343        if ( ld == NULL ) return; // not found? ->ignore!
1344        logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1345}
1346
1347bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1348        const address_v* remote ) {
1349        logging_debug("Accepting link request from " << remote->to_string() );
1350        return true;
1351}
1352
1353/// handles a message from base communication
1354bool BaseOverlay::receiveMessage(const Message* message,
1355                const LinkID& link, const NodeID& ) {
1356        // get descriptor for link
1357        LinkDescriptor* ld = getDescriptor( link, true );
1358        return handleMessage( message, ld, link );
1359}
1360
1361// ----------------------------------------------------------------------------
1362
1363/// Handle spovnet instance join requests
1364bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1365
1366        // decapsulate message
1367        JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1368        logging_info( "Received join request for spovnet " <<
1369                        joinReq->getSpoVNetID().toString() );
1370
1371        // check spovnet id
1372        if( joinReq->getSpoVNetID() != spovnetId ) {
1373                logging_error(
1374                        "Received join request for spovnet we don't handle " <<
1375                        joinReq->getSpoVNetID().toString() );
1376                return false;
1377        }
1378
1379        // TODO: here you can implement mechanisms to deny joining of a node
1380        bool allow = true;
1381        logging_info( "Sending join reply for spovnet " <<
1382                        spovnetId.toString() << " to node " <<
1383                        overlayMsg->getSourceNode().toString() <<
1384                        ". Result: " << (allow ? "allowed" : "denied") );
1385        joiningNodes.push_back( overlayMsg->getSourceNode() );
1386
1387        // return overlay parameters
1388        assert( overlayInterface != NULL );
1389        logging_debug( "Using bootstrap end-point "
1390                << getEndpointDescriptor().toString() )
1391        OverlayParameterSet parameters = overlayInterface->getParameters();
1392        OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1393                OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1394        JoinReply replyMsg( spovnetId, parameters,
1395                        allow, getEndpointDescriptor() );
1396        retmsg.encapsulate(&replyMsg);
1397        bc->sendMessage( bcLink, &retmsg );
1398
1399        return true;
1400}
1401
1402/// Handle replies to spovnet instance join requests
1403bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1404        // decapsulate message
1405        logging_debug("received join reply message");
1406        JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1407
1408        // correct spovnet?
1409        if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1410                logging_error( "Received SpoVNet join reply for " <<
1411                                replyMsg->getSpoVNetID().toString() <<
1412                                " != " << spovnetId.toString() );
1413                delete replyMsg;
1414                return false;
1415        }
1416
1417        // access granted? no -> fail
1418        if( !replyMsg->getJoinAllowed() ) {
1419                logging_error( "Our join request has been denied" );
1420
1421                // drop initiator link
1422                if( !bcLink.isUnspecified() ){
1423                        bc->dropLink( bcLink );
1424
1425                        vector<LinkID>::iterator it = std::find(
1426                                        bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1427                        if( it != bootstrapLinks.end() )
1428                                bootstrapLinks.erase(it);
1429                }
1430
1431                // inform all registered services of the event
1432                BOOST_FOREACH( NodeListener* i, nodeListeners )
1433                        i->onJoinFailed( spovnetId );
1434
1435                delete replyMsg;
1436                return true;
1437        }
1438
1439        // access has been granted -> continue!
1440        logging_info("Join request has been accepted for spovnet " <<
1441                        spovnetId.toString() );
1442
1443        logging_debug( "Using bootstrap end-point "
1444                << replyMsg->getBootstrapEndpoint().toString() );
1445
1446        // create overlay structure from spovnet parameter set
1447        // if we have not boostrapped yet against some other node
1448        if( overlayInterface == NULL ){
1449
1450                logging_debug("first-time bootstrapping");
1451
1452                overlayInterface = OverlayFactory::create(
1453                        *this, replyMsg->getParam(), nodeId, this );
1454
1455                // overlay structure supported? no-> fail!
1456                if( overlayInterface == NULL ) {
1457                        logging_error( "overlay structure not supported" );
1458
1459                        if( !bcLink.isUnspecified() ){
1460                                bc->dropLink( bcLink );
1461
1462                                vector<LinkID>::iterator it = std::find(
1463                                                bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1464                                if( it != bootstrapLinks.end() )
1465                                        bootstrapLinks.erase(it);
1466                        }
1467
1468                        // inform all registered services of the event
1469                        BOOST_FOREACH( NodeListener* i, nodeListeners )
1470                        i->onJoinFailed( spovnetId );
1471
1472                        delete replyMsg;
1473                        return true;
1474                }
1475
1476                // everything ok-> join the overlay!
1477                state = BaseOverlayStateCompleted;
1478                overlayInterface->createOverlay();
1479
1480                overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1481                overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1482
1483                // update ovlvis
1484                //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1485
1486                // inform all registered services of the event
1487                BOOST_FOREACH( NodeListener* i, nodeListeners )
1488                        i->onJoinCompleted( spovnetId );
1489
1490                delete replyMsg;
1491
1492        } else {
1493
1494                // this is not the first bootstrap, just join the additional node
1495                logging_debug("not first-time bootstrapping");
1496                overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1497                overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1498
1499                delete replyMsg;
1500
1501        } // if( overlayInterface == NULL )
1502
1503        return true;
1504}
1505
1506
1507bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1508        // get service
1509        const ServiceID& service = overlayMsg->getService();
1510        logging_debug( "Received data for service " << service.toString()
1511                << " on link " << overlayMsg->getDestinationLink().toString() );
1512
1513        // delegate data message
1514        getListener(service)->onMessage(
1515                overlayMsg,
1516                overlayMsg->getSourceNode(),
1517                overlayMsg->getDestinationLink()
1518        );
1519
1520        return true;
1521}
1522
1523
1524bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1525
1526        if( ld == NULL ) {
1527                logging_warn( "received overlay update message for link for "
1528                                << "which we have no mapping" );
1529                return false;
1530        }
1531        logging_info("Received type update message on link " << ld );
1532
1533        // update our link mapping information for this link
1534        bool changed =
1535                ( ld->remoteNode != overlayMsg->getSourceNode() )
1536                || ( ld->service != overlayMsg->getService() );
1537
1538        // set parameters
1539        ld->up         = true;
1540        ld->remoteNode = overlayMsg->getSourceNode();
1541        ld->remoteLink = overlayMsg->getSourceLink();
1542        ld->service    = overlayMsg->getService();
1543        ld->autolink   = overlayMsg->isAutoLink();
1544
1545        // if our link information changed, we send out an update, too
1546        if( changed ) {
1547                overlayMsg->swapRoles();
1548                overlayMsg->setSourceNode(nodeId);
1549                overlayMsg->setSourceLink(ld->overlayId);
1550                overlayMsg->setService(ld->service);
1551                send( overlayMsg, ld );
1552        }
1553
1554        // service registered? no-> error!
1555        if( !communicationListeners.contains( ld->service ) ) {
1556                logging_warn( "Link up: event listener has not been registered" );
1557                return false;
1558        }
1559
1560        // default or no service registered?
1561        CommunicationListener* listener = communicationListeners.get( ld->service );
1562        if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1563                logging_warn("Link up: event listener is default or null!" );
1564                return true;
1565        }
1566
1567        // update descriptor
1568        ld->listener = listener;
1569        ld->setAutoUsed();
1570        ld->setAlive();
1571
1572        // ask the service whether it wants to accept this link
1573        if( !listener->onLinkRequest(ld->remoteNode) ) {
1574
1575                logging_debug("Link id=" << ld->overlayId.toString() <<
1576                        " has been denied by service " << ld->service.toString() << ", dropping link");
1577
1578                // prevent onLinkDown calls to the service
1579                ld->listener = &CommunicationListener::DEFAULT;
1580
1581                // drop the link
1582                dropLink( ld->overlayId );
1583                return true;
1584        }
1585
1586        // set link up
1587        ld->up = true;
1588        logging_info( "Link has been accepted by service and is up: " << ld );
1589
1590        // auto links: link has been accepted -> send queued messages
1591        if( ld->messageQueue.size() > 0 ) {
1592                logging_info( "Sending out queued messages on link " << ld );
1593                BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1594                        sendMessage( msg, ld->overlayId );
1595                        delete msg;
1596                }
1597                ld->messageQueue.clear();
1598        }
1599
1600        // call the notification functions
1601        listener->onLinkUp( ld->overlayId, ld->remoteNode );
1602        sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1603
1604        return true;
1605}
1606
1607/// handle a link request and reply
1608bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1609        logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1610
1611        //TODO: Check if a request has already been sent using getSourceLink() ...
1612
1613        // create link descriptor
1614        LinkDescriptor* ldn = addDescriptor();
1615
1616        // flags
1617        ldn->up = true;
1618        ldn->fromRemote = true;
1619        ldn->relayed = true;
1620
1621        // parameters
1622        ldn->service = overlayMsg->getService();
1623        ldn->listener = getListener(ldn->service);
1624        ldn->remoteNode = overlayMsg->getSourceNode();
1625        ldn->remoteLink = overlayMsg->getSourceLink();
1626
1627        // update time-stamps
1628        ldn->setAlive();
1629        ldn->setAutoUsed();
1630
1631        // create reply message and send back!
1632        overlayMsg->swapRoles(); // swap source/destination
1633        overlayMsg->setType(OverlayMsg::typeLinkReply);
1634        overlayMsg->setSourceLink(ldn->overlayId);
1635        overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1636        overlayMsg->setRelayed(true);
1637        send( overlayMsg, ld ); // send back to link
1638
1639        // inform listener
1640        ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1641
1642        return true;
1643}
1644
1645bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1646
1647        // find link request
1648        LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1649
1650        // not found? yes-> drop with error!
1651        if (ldn == NULL) {
1652                logging_error( "No link request pending for "
1653                        << overlayMsg->getDestinationLink().toString() );
1654                return false;
1655        }
1656        logging_debug("Handling link reply for " << ldn )
1657
1658        // check if already up
1659        if (ldn->up) {
1660                logging_warn( "Link already up: " << ldn );
1661                return true;
1662        }
1663
1664        // debug message
1665        logging_debug( "Link request reply received. Establishing link"
1666                << " for service " << overlayMsg->getService().toString()
1667                << " with local id=" << overlayMsg->getDestinationLink()
1668                << " and remote link id=" << overlayMsg->getSourceLink()
1669                << " to " << overlayMsg->getSourceEndpoint().toString()
1670        );
1671
1672        // set local link descriptor data
1673        ldn->up = true;
1674        ldn->relayed = true;
1675        ldn->service = overlayMsg->getService();
1676        ldn->listener = getListener(ldn->service);
1677        ldn->remoteLink = overlayMsg->getSourceLink();
1678        ldn->remoteNode = overlayMsg->getSourceNode();
1679
1680        // update timestamps
1681        ldn->setAlive();
1682        ldn->setAutoUsed();
1683
1684        // auto links: link has been accepted -> send queued messages
1685        if( ldn->messageQueue.size() > 0 ) {
1686                logging_info( "Sending out queued messages on link " <<
1687                        ldn->overlayId.toString() );
1688                BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1689                        sendMessage( msg, ldn->overlayId );
1690                        delete msg;
1691                }
1692                ldn->messageQueue.clear();
1693        }
1694
1695        // inform listeners about new link
1696        ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1697
1698        // try to replace relay link with direct link
1699        ldn->communicationId =
1700                bc->establishLink( overlayMsg->getSourceEndpoint() );
1701
1702        return true;
1703}
1704
1705/// handle a keep-alive message for a link
1706bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1707        LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1708        if ( rld != NULL ) {
1709                logging_debug("Keep-Alive for " <<
1710                        overlayMsg->getDestinationLink() );
1711                if (overlayMsg->isRouteRecord())
1712                        rld->routeRecord = overlayMsg->getRouteRecord();
1713                rld->setAlive();
1714                return true;
1715        } else {
1716                logging_error("Keep-Alive for "
1717                                << overlayMsg->getDestinationLink() << ": link unknown." );
1718                return false;
1719        }
1720}
1721
1722/// handle a direct link message
1723bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1724        logging_debug( "Received direct link replacement request" );
1725
1726        /// get destination overlay link
1727        LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1728        if (rld == NULL || ld == NULL) {
1729                logging_error("Direct link replacement: Link "
1730                                << overlayMsg->getDestinationLink() << "not found error." );
1731                return false;
1732        }
1733        logging_info( "Received direct link convert notification for " << rld );
1734
1735        // update information
1736        rld->communicationId = ld->communicationId;
1737        rld->communicationUp = true;
1738        rld->relayed = false;
1739
1740        // mark used and alive!
1741        rld->setAlive();
1742        rld->setAutoUsed();
1743
1744        // erase the original descriptor
1745        eraseDescriptor(ld->overlayId);
1746}
1747
1748/// handles an incoming message
1749bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1750        const LinkID bcLink ) {
1751        logging_debug( "Handling message: " << message->toString());
1752
1753        // decapsulate overlay message
1754        OverlayMsg* overlayMsg =
1755                const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1756        if( overlayMsg == NULL ) return false;
1757
1758        // increase number of hops
1759        overlayMsg->increaseNumHops();
1760
1761        // refresh relay information
1762        refreshRelayInformation( overlayMsg, ld );
1763
1764        // update route record
1765        overlayMsg->addRouteRecord(nodeId);
1766
1767        // handle dht messages (do not route)
1768        if (overlayMsg->isDHTMessage())
1769                return handleDHTMessage(overlayMsg);
1770
1771        // handle signaling messages (do not route!)
1772        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1773                overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1774                overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1775                delete overlayMsg;
1776                return true;
1777        }
1778
1779        // message for reached destination? no-> route message
1780        if (!overlayMsg->getDestinationNode().isUnspecified() &&
1781                 overlayMsg->getDestinationNode() != nodeId ) {
1782                logging_debug("Routing message "
1783                        << " from " << overlayMsg->getSourceNode()
1784                        << " to " << overlayMsg->getDestinationNode()
1785                );
1786                route( overlayMsg );
1787                delete overlayMsg;
1788                return true;
1789        }
1790
1791        // handle DHT response messages
1792        if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {
1793                bool ret = handleDHTMessage(overlayMsg);
1794                delete overlayMsg;
1795                return ret;
1796        }
1797
1798
1799        // handle base overlay message
1800        bool ret = false; // return value
1801        switch ( overlayMsg->getType() ) {
1802
1803                // data transport messages
1804                case OverlayMsg::typeData:
1805                        ret = handleData(overlayMsg, ld);                       break;
1806
1807                // overlay setup messages
1808                case OverlayMsg::typeJoinRequest:
1809                        ret = handleJoinRequest(overlayMsg, bcLink );   break;
1810                case OverlayMsg::typeJoinReply:
1811                        ret = handleJoinReply(overlayMsg, bcLink );     break;
1812
1813                // link specific messages
1814                case OverlayMsg::typeLinkRequest:
1815                        ret = handleLinkRequest(overlayMsg, ld );       break;
1816                case OverlayMsg::typeLinkReply:
1817                        ret = handleLinkReply(overlayMsg, ld );         break;
1818                case OverlayMsg::typeLinkUpdate:
1819                        ret = handleLinkUpdate(overlayMsg, ld );        break;
1820                case OverlayMsg::typeLinkAlive:
1821                        ret = handleLinkAlive(overlayMsg, ld );         break;
1822                case OverlayMsg::typeLinkDirect:
1823                        ret = handleLinkDirect(overlayMsg, ld );        break;
1824
1825                // handle unknown message type
1826                default: {
1827                        logging_error( "received message in invalid state! don't know " <<
1828                                "what to do with this message of type " << overlayMsg->getType() );
1829                        ret = false;
1830                        break;
1831                }
1832        }
1833
1834        // free overlay message and return value
1835        delete overlayMsg;
1836        return ret;
1837}
1838
1839// ----------------------------------------------------------------------------
1840
1841void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1842
1843        logging_debug( "broadcasting message to all known nodes " <<
1844                        "in the overlay from service " + service.toString() );
1845
1846        OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1847        OverlayInterface::NodeList::iterator i = nodes.begin();
1848        for(; i != nodes.end(); i++ ) {
1849                if( *i == nodeId) continue; // don't send to ourselfs
1850                sendMessage( message, *i, service );
1851        }
1852}
1853
1854/// return the overlay neighbors
1855vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1856        // the known nodes _can_ also include our node, so we remove ourself
1857        vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1858        vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1859        if( i != nodes.end() ) nodes.erase( i );
1860        return nodes;
1861}
1862
1863const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1864        if( lid == LinkID::UNSPECIFIED ) return nodeId;
1865        const LinkDescriptor* ld = getDescriptor(lid);
1866        if( ld == NULL ) return NodeID::UNSPECIFIED;
1867        else return ld->remoteNode;
1868}
1869
1870vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1871        vector<LinkID> linkvector;
1872        BOOST_FOREACH( LinkDescriptor* ld, links ) {
1873                if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1874                        linkvector.push_back( ld->overlayId );
1875                }
1876        }
1877        return linkvector;
1878}
1879
1880
1881void BaseOverlay::onNodeJoin(const NodeID& node) {
1882        JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1883        if( i == joiningNodes.end() ) return;
1884
1885        logging_info( "node has successfully joined baseoverlay and overlay structure "
1886                        << node.toString() );
1887
1888        joiningNodes.erase( i );
1889}
1890
1891void BaseOverlay::eventFunction() {
1892        stabilizeRelays();
1893        stabilizeLinks();
1894}
1895
1896
1897// ----------------------------------------------------------------------------
1898
1899
1900
1901/// stabilize DHT state
1902void BaseOverlay::stabilizeDHT() {
1903
1904}
1905
1906// handle DHT messages
1907bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {
1908
1909        // decapsulate message
1910        logging_debug("received DHT message");
1911        DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();
1912
1913        // handle DHT data message
1914        if (msg->getType()==OverlayMsg::typeDHTData) {
1915                const ServiceID& service = msg->getService();
1916                logging_debug( "Received DHT data for service " << service.toString() );
1917
1918                // delegate data message
1919                getListener(service)->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );
1920                return true;
1921        }
1922
1923        // route message to closest node
1924        if (!overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
1925                logging_debug("Routing DHT message to closest node "
1926                        << " from " << msg->getSourceNode()
1927                        << " to " << msg->getDestinationNode()
1928                );
1929                route( msg );
1930                delete msg;
1931                return true;
1932        }
1933
1934        // now, we are the closest node...
1935        switch (msg->getType()) {
1936        case OverlayMsg::typeDHTPut: {
1937                BOOST_FOREACH( Data value, dhtMsg->getValues() )
1938                        dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );
1939                break;
1940        }
1941
1942        case OverlayMsg::typeDHTGet: {
1943                vector<Data> vect = dht->get(dhtMsg->getKey());
1944                OverlayMsg omsg(*msg);
1945                omsg.swapRoles();
1946                omsg.setType(OverlayMsg::typeDHTData);
1947                DHTMessage dhtmsg(dhtMsg->getKey(), vect);
1948                omsg.encapsulate(&dhtmsg);
1949                this->send(&omsg, omsg.getDestinationNode());
1950                break;
1951        }
1952
1953        case OverlayMsg::typeDHTRemove: {
1954                if (dhtMsg->hasValues()) {
1955                        BOOST_FOREACH( Data value, dhtMsg->getValues() )
1956                                        dht->remove(dhtMsg->getKey(), value );
1957                } else
1958                        dht->remove( dhtMsg->getKey() );
1959                break;
1960        }
1961
1962        default:
1963                logging_error("DHT Message type unknown.");
1964                return false;
1965        }
1966        delete msg;
1967        return true;
1968}
1969
1970/// put a value to the DHT with a ttl given in seconds
1971void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl ) {
1972        // calculate hash
1973        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
1974        OverlayMsg msg(OverlayMsg::typeDHTPut);
1975        DHTMessage dhtmsg(key,value);
1976        dhtmsg.setTTL(ttl);
1977        msg.setDestinationNode(dest);
1978        msg.encapsulate( &dhtmsg );
1979        send(&msg, dest);
1980}
1981
1982/// removes a key value pair from the DHT
1983void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {
1984        // calculate hash
1985        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
1986        OverlayMsg msg(OverlayMsg::typeDHTRemove);
1987        DHTMessage dhtmsg(key,value);
1988        msg.setDestinationNode(dest);
1989        msg.encapsulate( &dhtmsg );
1990        send(&msg, dest);
1991}
1992
1993/// removes all data stored at the given key
1994void BaseOverlay::dhtRemove( const Data& key ) {
1995        // calculate hash
1996        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
1997        OverlayMsg msg(OverlayMsg::typeDHTRemove);
1998        DHTMessage dhtmsg(key);
1999        msg.setDestinationNode(dest);
2000        msg.encapsulate( &dhtmsg );
2001        send(&msg, dest);
2002}
2003
2004/// requests data stored using key
2005void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {
2006        // calculate hash
2007        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
2008        OverlayMsg msg(OverlayMsg::typeDHTRemove);
2009        DHTMessage dhtmsg(key);
2010        msg.setDestinationNode(dest);
2011        msg.setService(service);
2012        msg.encapsulate( &dhtmsg );
2013        send(&msg, dest);
2014}
2015
2016
2017}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.