An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/overlay/modules/chord/Chord.cpp @ 12775

Last change on this file since 12775 was 12775, checked in by hock@…, 9 years ago

Chord: catch message_not_sent exception and print a warning (instead of crashing)

File size: 20.2 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "ariba/overlay/BaseOverlay.h"
40#include "ariba/overlay/messages/OverlayMsg.h"
41
42#include "Chord.h"
43#include "detail/chord_routing_table.hpp"
44
45//#include "messages/Discovery.h"  // XXX DEPRECATED
46
47namespace ariba {
48namespace overlay {
49
50enum signalMessageTypes {
51        typeDiscovery = OverlayMsg::typeSignalingStart + 0x01,
52        typeLeave = OverlayMsg::typeSignalingStart + 0x02,
53};
54
55typedef chord_routing_table::item route_item;
56
57using ariba::transport::system_priority;
58
59use_logging_cpp( Chord );
60
61
62////// Messages
63struct DiscoveryMessage
64{
65    /**
66     * DiscoveryMessage
67     * - type
68     * - data
69     * - Endpoint
70     */
71
72    // type enum
73    enum type_ {
74        invalid = 0,
75        normal = 1,
76        successor = 2,
77        predecessor = 3
78    };
79
80   
81    // data
82    uint8_t type;
83    uint8_t ttl;
84    EndpointDescriptor endpoint;
85
86    // serialize
87    reboost::message_t serialize()
88    {
89        // serialize endpoint
90        reboost::message_t msg = endpoint.serialize();
91       
92        // serialize type and ttl
93        uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data();
94        buff1[0] = type;
95        buff1[1] = ttl;
96       
97        return msg;
98    }
99   
100    //deserialize
101    reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff)
102    {
103        // deserialize type and ttl
104        const uint8_t* bytes = buff.data();
105        type = bytes[0];
106        ttl = bytes[1];
107       
108        // deserialize endpoint
109        return endpoint.deserialize(buff(2*sizeof(uint8_t)));
110    }
111};
112
113
114Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid,
115                OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) :
116        OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) {
117
118        // create routing table
119        this->table = new chord_routing_table(_nodeid, 4);
120        orphan_removal_counter = 0;
121        discovery_count = 0;
122        stabilize_counter = 0;
123        stabilize_finger = 0;
124}
125
126Chord::~Chord() {
127
128        // delete routing table
129        delete table;
130}
131
132/// helper: sets up a link using the base overlay
133LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) {
134
135        // check if we already have a connection
136        for (size_t i=0; i<table->size(); i++)
137                if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()))
138                        return LinkID::UNSPECIFIED;
139
140        // check if we are already trying to establish a link
141        for (size_t i=0; i<pending.size(); i++)
142                if ( pending[i] == remote ) {
143                        logging_debug("Already trying to establish a link to node "
144                                << remote.toString() );
145                        return LinkID::UNSPECIFIED;
146                }
147
148        // adding node to list of pending connections
149        pending.push_back( remote );
150
151        logging_info("Request to setup link to " << endpoint.toString() );
152
153        // establish link via base overlay
154        return baseoverlay.establishLink( endpoint, remote,
155                        OverlayInterface::OVERLAY_SERVICE_ID );
156}
157
158/// helper: sends a message using the "base overlay"
159void Chord::send( OverlayMsg* msg, const LinkID& link ) {
160        if (link.isUnspecified())
161        return;
162   
163    try
164    {
165        baseoverlay.send_link( msg, link, system_priority::OVERLAY );
166    }
167    catch ( message_not_sent& e )
168    {
169        logging_warn("Chord: Could not send message over link " << link
170                << ": " << e.what());
171    }
172}
173
174void Chord::send_node( OverlayMsg* message, const NodeID& remote )
175{
176    try
177    {
178        baseoverlay.send( message, remote, system_priority::OVERLAY );
179    }
180    catch ( message_not_sent& e )
181    {
182        logging_warn("Chord: Could not send message to " << remote
183                << ": " << e.what());
184    }
185}
186
187/// sends a discovery message
188void Chord::send_discovery_to(const NodeID& remote, int ttl) {
189        LinkID link = getNextLinkId(remote);
190        if ( remote == nodeid || link.isUnspecified()) return;
191        if ( table->size() == 0 ) return;
192        ttl = 2;
193
194        OverlayMsg msg( typeDiscovery );
195        msg.setRegisterRelay(true);
196       
197        // create DiscoveryMessage
198        DiscoveryMessage dmsg;
199        dmsg.type = DiscoveryMessage::normal;
200        dmsg.ttl = ttl;
201        dmsg.endpoint = baseoverlay.getEndpointDescriptor();
202
203        msg.set_payload_message(dmsg.serialize());
204
205        // send to node
206    try
207    {
208        baseoverlay.send_node( &msg, remote, system_priority::OVERLAY );
209    }
210    catch ( message_not_sent& e )
211    {
212        logging_warn("Chord: Could not send message to " << remote
213                << ": " << e.what());
214    }
215}
216
217void Chord::discover_neighbors( const LinkID& link ) {
218        uint8_t ttl = 1;
219       
220    // FIXME try-catch for the send operations
221   
222    // create DiscoveryMessage
223    DiscoveryMessage dmsg;
224    dmsg.ttl = ttl;
225    dmsg.endpoint = baseoverlay.getEndpointDescriptor();
226        {
227                // send predecessor discovery
228                OverlayMsg msg( typeDiscovery );
229                msg.setRegisterRelay(true);
230
231                // set type
232            dmsg.type = DiscoveryMessage::predecessor;
233
234            // send
235            msg.set_payload_message(dmsg.serialize());
236                send(&msg, link);
237        }
238        {
239                // send successor discovery
240                OverlayMsg msg( typeDiscovery );
241//              msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() );  // XXX this was redundand, wasn't it?
242                msg.setRegisterRelay(true);
243
244                // set type
245        dmsg.type = DiscoveryMessage::successor;
246
247        // send
248        msg.set_payload_message(dmsg.serialize());
249                send(&msg, link);
250        }
251}
252
253
254void Chord::createOverlay() {
255}
256
257void Chord::deleteOverlay() {
258
259}
260
261void Chord::joinOverlay(const EndpointDescriptor& boot) {
262        logging_info( "joining Chord overlay structure through end-point " <<
263                        (boot.isUnspecified() ? "local" : boot.toString()) );
264
265        // initiator? no->setup first link
266        if (!boot.isUnspecified())
267                bootstrapLinks.push_back( setup(boot) );
268
269        // timer for stabilization management
270//      Timer::setInterval(1000);  // TODO find an appropriate interval!
271        Timer::setInterval(10000);  // XXX testing...
272        Timer::start();
273}
274
275void Chord::leaveOverlay() {
276        Timer::stop();
277        for (size_t i = 0; i < table->size(); i++) {
278                route_item* it = (*table)[i];
279                OverlayMsg msg( typeLeave );
280                send( &msg, it->info );
281        }
282}
283
284/// @see OverlayInterface.h
285const EndpointDescriptor& Chord::resolveNode(const NodeID& node) {
286        const route_item* item = table->get(node);
287        if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
288        return baseoverlay.getEndpointDescriptor(item->info);
289}
290
291/// @see OverlayInterface.h
292bool Chord::isClosestNodeTo( const NodeID& node ) {
293        return table->is_closest_to(node);
294}
295
296/// @see OverlayInterface.h
297const LinkID& Chord::getNextLinkId( const NodeID& id ) const {
298        // get next hop
299        const route_item* item = table->get_next_hop(id);
300
301        // returns a unspecified id when this is itself
302        if (item == NULL || item->id == nodeid)
303                return LinkID::UNSPECIFIED;
304
305        /// return routing info
306        return item->info;
307}
308
309std::vector<const LinkID*> Chord::getSortedLinkIdsTowardsNode(
310    const NodeID& id, int num ) const
311{
312    std::vector<const LinkID*> ret;
313   
314    switch ( num )
315    {
316        // special case: just call »getNextLinkId«
317        case 1:
318        {
319            ret.push_back(&getNextLinkId(id));
320           
321            break;
322        }
323       
324        // * calculate top 2 *
325        case 0:
326        case 2:
327        {
328            std::vector<const route_item*> items = table->get_next_2_hops(id);
329           
330            ret.reserve(items.size());
331           
332            BOOST_FOREACH( const route_item* item, items )
333            {
334                ret.push_back(&item->info);
335            }
336           
337            break;
338        }
339       
340        // NOTE: implement real sorting, if needed (and handle "case 0" properly, then)
341        default:
342        {
343            throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)");
344           
345            break;
346        }
347    }
348   
349    return ret;
350}
351
352
353/// @see OverlayInterface.h
354const NodeID& Chord::getNextNodeId( const NodeID& id ) const {
355        // get next hop
356        const route_item* item = table->get_next_hop(id);
357       
358        // return unspecified if no next hop could be found
359        if (item == NULL) {
360                return NodeID::UNSPECIFIED;
361        }
362       
363        return item->id;
364}
365
366OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const {
367        OverlayInterface::NodeList nodelist;
368
369        if( deep ){
370                // all nodes that I know, fingers, succ/pred
371                for (size_t i = 0; i < table->size(); i++){
372                        if ((*table)[i]->ref_count != 0
373                                        && !(*table)[i]->info.isUnspecified())
374                                nodelist.push_back((*table)[i]->id);
375                }
376        } else {
377                // only succ and pred
378                if( table->get_predesessor() != NULL ){
379                        nodelist.push_back( *(table->get_predesessor()) );
380                }
381                if( table->get_successor() != NULL ){
382                        OverlayInterface::NodeList::iterator i =
383                                std::find( nodelist.begin(), nodelist.end(), *(table->get_successor()) );
384                        if( i == nodelist.end() )
385                                nodelist.push_back( *(table->get_successor()) );
386                }
387        }
388
389        return nodelist;
390}
391
392/// @see CommunicationListener.h
393/// @see OverlayInterface.h
394void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) {
395        logging_info("link_up: link=" << lnk.toString() << " remote=" <<
396                        remote.toString() );
397        for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++)
398                if (*i == remote) {
399                        pending.erase(i);
400                        break;
401                }
402
403        if (remote==nodeid) {
404                logging_warn("dropping link that has been established to myself (nodes have same nodeid?)");
405                logging_warn("NodeID: " << remote);
406                baseoverlay.dropLink(lnk);
407                return;
408        }
409
410        route_item* item = table->insert(remote);
411
412        // item added to routing table?
413        if (item != NULL) { // yes-> add to routing table
414                logging_info("new routing neighbor: " << remote.toString()
415                                << " with link " << lnk.toString());
416
417                // replace with new link if link is "better"
418                if (item->info!=lnk && item->info.isUnspecified()==false) {
419                        if (baseoverlay.compare( item->info, lnk ) == 1) {
420                                logging_info("Replacing link due to concurrent link establishment.");
421                                baseoverlay.dropLink(item->info);
422                                item->info = lnk;
423                        }
424                } else {
425                        item->info = lnk;
426                }
427
428                // discover neighbors of new overlay neighbor
429                showLinks();
430        } else { // no-> add orphan entry to routing table
431                logging_info("new orphan: " << remote.toString()
432                                << " with link " << lnk.toString());
433                table->insert_orphan(remote)->info = lnk;
434        }
435
436        // erase bootstrap link
437        vector<LinkID>::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk);
438        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
439}
440
441/// @see CommunicationListener.h or @see OverlayInterface.h
442void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) {
443    // XXX logging_debug
444        logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" <<
445                        remote.toString() );
446
447        // remove link from routing table
448        route_item* item = table->get(remote);
449        if (item!=NULL && item->info==lnk) {
450                item->info = LinkID::UNSPECIFIED;
451                table->remove(remote);
452        }
453}
454
455/// @see CommunicationListener.h
456/// @see OverlayInterface.h
457void Chord::onMessage(OverlayMsg* msg,
458        reboost::shared_buffer_t sub_msg,
459        const NodeID& remote,
460                const LinkID& link) {
461
462        // handle messages
463        switch ((signalMessageTypes) msg->getType()) {
464
465        // discovery request
466        case typeDiscovery:
467        {
468                // deserialize discovery message
469                DiscoveryMessage dmsg;
470                dmsg.deserialize(sub_msg);
471               
472                logging_debug("Received discovery message with"
473                            << " src=" << msg->getSourceNode().toString()
474                                << " dst=" << msg->getDestinationNode().toString()
475                                << " ttl=" << (int)dmsg.ttl
476                                << " type=" << (int)dmsg.type
477                );
478
479                // add discovery node id
480                bool found = false;
481                BOOST_FOREACH( NodeID& value, discovery )
482                        if (value == msg->getSourceNode()) {
483                                found = true;
484                                break;
485                        }
486                if (!found) discovery.push_back(msg->getSourceNode());
487
488                // check if source node can be added to routing table and setup link
489                if (msg->getSourceNode() != nodeid)
490                        setup( dmsg.endpoint, msg->getSourceNode() );
491
492                // process discovery message -------------------------- switch start --
493                switch ( dmsg.type )
494                {
495            // normal: route discovery message like every other message
496            case DiscoveryMessage::normal:
497            {
498                // closest node? yes-> split to follow successor and predecessor
499                if ( table->is_closest_to(msg->getDestinationNode()) )
500                {
501                    logging_debug("Discovery split:");
502                    if (!table->get_successor()->isUnspecified())
503                    {
504                        OverlayMsg omsg(*msg);
505                       
506                        dmsg.type = DiscoveryMessage::successor;
507                        omsg.set_payload_message(dmsg.serialize());
508
509                        logging_debug("* Routing to successor "
510                                << table->get_successor()->toString() );
511                        send_node( &omsg, *table->get_successor() );
512                    }
513   
514                    // send predecessor message
515                    if (!table->get_predesessor()->isUnspecified())
516                    {
517                        OverlayMsg omsg(*msg);
518                       
519                        dmsg.type = DiscoveryMessage::predecessor;
520                        omsg.set_payload_message(dmsg.serialize());
521
522                        logging_debug("* Routing to predecessor "
523                                << table->get_predesessor()->toString() );
524                        send_node( &omsg, *table->get_predesessor() );
525                    }
526                }
527                // no-> route message
528                else
529                {
530                    baseoverlay.route( msg );
531                }
532                break;
533            }
534   
535            // successor mode: follow the successor until TTL is zero
536            case DiscoveryMessage::successor:
537            case DiscoveryMessage::predecessor:
538            {
539                // reached destination? no->forward!
540                if (msg->getDestinationNode() != nodeid)
541                {
542                    OverlayMsg omsg(*msg);
543                    omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
544                   
545                    omsg.set_payload_message(dmsg.serialize());
546                   
547                    baseoverlay.route( &omsg );
548                    break;
549                }
550   
551                // time to live ended? yes-> stop routing
552                if (dmsg.ttl == 0 || dmsg.ttl > 10) break;
553   
554                // decrease time-to-live
555                dmsg.ttl--;
556   
557                const route_item* item = NULL;
558                if (dmsg.type == DiscoveryMessage::successor &&
559                        table->get_successor() != NULL)
560                {
561                    item = table->get(*table->get_successor());
562                }
563                else if (table->get_predesessor() != NULL)
564                {
565                        item = table->get(*table->get_predesessor());
566                }
567                if (item == NULL)
568                    break;
569   
570                logging_debug("Routing discovery message to succ/pred "
571                    << item->id.toString() );
572                OverlayMsg omsg(*msg);
573                omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
574                omsg.setDestinationNode(item->id);
575               
576                omsg.set_payload_message(dmsg.serialize());
577               
578                send_node( &omsg, omsg.getDestinationNode() );
579                break;
580            }
581            case DiscoveryMessage::invalid:
582                break;
583   
584            default:
585                break;
586            }
587            // process discovery message ---------------------------- switch end --
588   
589            break;
590        }
591   
592        // leave
593        case typeLeave: {
594            if (link!=LinkID::UNSPECIFIED) {
595                route_item* item = table->get(remote);
596                if (item!=NULL) item->info = LinkID::UNSPECIFIED;
597                table->remove(remote);
598                baseoverlay.dropLink(link);
599            }
600            break;
601        }
602        }
603}
604
605void Chord::eventFunction() {
606        stabilize_counter++;
607        if (stabilize_counter < 0 || stabilize_counter == 2) {
608
609                // reset counter
610                stabilize_counter = 0;
611
612                // clear pending connections
613                pending.clear();
614
615                // get number of real neighbors
616                size_t numNeighbors = 0;
617                for (size_t i = 0; i < table->size(); i++) {
618                        route_item* it = (*table)[i];
619                        if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++;
620                }
621                logging_info("Running stabilization: #links="
622                                << table->size() << " #neighbors=" << numNeighbors );
623
624                // updating neighbors
625                logging_debug("Discover new ring neighbors");
626                for (size_t i=0; i<table->size(); i++) {
627                        LinkID id = (*table)[i]->info;
628                        if (!id.isUnspecified()) discover_neighbors(id);
629                }
630
631                // sending discovery
632                logging_debug("Sending discovery message to my neighbors and fingers");
633                stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() );
634                const NodeID disc = table->get_finger_table(stabilize_finger).get_compare().get_center();
635                if (disc != nodeid)
636                        send_discovery_to(disc);
637
638                // remove orphan links
639                orphan_removal_counter++;
640                if (orphan_removal_counter <0 || orphan_removal_counter >= 2) {
641                        logging_info("Discovered nodes: ");
642                        BOOST_FOREACH( NodeID& id, discovery )
643                                logging_info("* " << id.toString());
644                        discovery.clear();
645                        logging_info("Running orphan removal");
646                        orphan_removal_counter = 0;
647                        for (size_t i = 0; i < table->size(); i++) {
648                                route_item* it = (*table)[i];
649                                if (it->ref_count == 0 && !it->info.isUnspecified()) {
650                                        logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString());
651                                        table->insert(it->id);
652                                        if (it->ref_count==0) {
653                                                LinkID id = it->info;
654                                                it->info = LinkID::UNSPECIFIED;
655                                                baseoverlay.dropLink(id);
656                                        }
657                                }
658                        }
659                }
660        }
661}
662
663void Chord::showLinks() {
664        logging_info("--- chord routing information ----------------------------------");
665        logging_info("predecessor: " << (table->get_predesessor()==NULL? "<none>" :
666                table->get_predesessor()->toString()) );
667        logging_info("node_id    : " << nodeid.toString() );
668        logging_info("successor  : " << (table->get_successor()==NULL? "<none>" :
669                table->get_successor()->toString()));
670        logging_info("----------------------------------------------------------------");
671}
672
673/// @see OverlayInterface.h
674std::string Chord::debugInformation() const {
675        std::ostringstream s;
676        s << "protocol   : Chord" << endl;
677        s << "node_id    : " << nodeid.toString() << endl;
678        s << "predecessor: " << (table->get_predesessor()==NULL? "<none>" :
679                table->get_predesessor()->toString()) << endl;
680        s << "successor  : " << (table->get_successor()==NULL? "<none>" :
681                table->get_successor()->toString()) << endl;
682        s << "nodes: " << endl;
683        for (size_t i = 0; i < table->size(); i++) {
684                route_item* it = (*table)[i];
685                if (it->ref_count != 0 && !it->info.isUnspecified()) {
686                        s << it->id.toString().substr(0,6)
687                          << " using " << it->info.toString().substr(0,6) << endl;
688                }
689        }
690        return s.str();
691}
692
693
694
695}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.