Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (11 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/overlay/modules/chord/Chord.cpp

    r10572 r12060  
    4343#include "detail/chord_routing_table.hpp"
    4444
    45 #include "messages/Discovery.h"
     45//#include "messages/Discovery.h"  // XXX DEPRECATED
    4646
    4747namespace ariba {
     
    5555typedef chord_routing_table::item route_item;
    5656
     57using ariba::transport::system_priority;
     58
    5759use_logging_cpp( Chord );
     60
     61
     62////// Messages
     63struct DiscoveryMessage
     64{
     65    /**
     66     * DiscoveryMessage
     67     * - type
     68     * - data
     69     * - Endpoint
     70     */
     71
     72    // type enum
     73    enum type_ {
     74        invalid = 0,
     75        normal = 1,
     76        successor = 2,
     77        predecessor = 3
     78    };
     79
     80   
     81    // data
     82    uint8_t type;
     83    uint8_t ttl;
     84    EndpointDescriptor endpoint;
     85
     86    // serialize
     87    reboost::message_t serialize()
     88    {
     89        // serialize endpoint
     90        reboost::message_t msg = endpoint.serialize();
     91       
     92        // serialize type and ttl
     93        uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data();
     94        buff1[0] = type;
     95        buff1[1] = ttl;
     96       
     97        return msg;
     98    }
     99   
     100    //deserialize
     101    reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff)
     102    {
     103        // deserialize type and ttl
     104        const uint8_t* bytes = buff.data();
     105        type = bytes[0];
     106        ttl = bytes[1];
     107       
     108        // deserialize endpoint
     109        return endpoint.deserialize(buff(2*sizeof(uint8_t)));
     110    }
     111};
     112
    58113
    59114Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid,
     
    102157
    103158/// helper: sends a message using the "base overlay"
    104 seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) {
    105         if (link.isUnspecified()) return 0;
    106         return baseoverlay.send_link( msg, link );
     159void Chord::send( OverlayMsg* msg, const LinkID& link ) {
     160        if (link.isUnspecified())
     161        return;
     162   
     163        baseoverlay.send_link( msg, link, system_priority::OVERLAY );
     164}
     165
     166void Chord::send_node( OverlayMsg* message, const NodeID& remote )
     167{
     168    try
     169    {
     170        baseoverlay.send( message, remote, system_priority::OVERLAY );
     171    }
     172    catch ( message_not_sent& e )
     173    {
     174        logging_warn("Chord: Could not send message to " << remote
     175                << ": " << e.what());
     176    }
    107177}
    108178
     
    116186        OverlayMsg msg( typeDiscovery );
    117187        msg.setRegisterRelay(true);
    118         Discovery dmsg( Discovery::normal, (uint8_t)ttl, baseoverlay.getEndpointDescriptor() );
    119         msg.encapsulate(&dmsg);
     188       
     189        // create DiscoveryMessage
     190        DiscoveryMessage dmsg;
     191        dmsg.type = DiscoveryMessage::normal;
     192        dmsg.ttl = ttl;
     193        dmsg.endpoint = baseoverlay.getEndpointDescriptor();
     194
     195        msg.set_payload_message(dmsg.serialize());
    120196
    121197        // send to node
    122         baseoverlay.send_node( &msg, remote );
     198    try
     199    {
     200        baseoverlay.send_node( &msg, remote, system_priority::OVERLAY );
     201    }
     202    catch ( message_not_sent& e )
     203    {
     204        logging_warn("Chord: Could not send message to " << remote
     205                << ": " << e.what());
     206    }
    123207}
    124208
    125209void Chord::discover_neighbors( const LinkID& link ) {
    126210        uint8_t ttl = 1;
     211       
     212    // FIXME try-catch for the send operations
     213   
     214    // create DiscoveryMessage
     215    DiscoveryMessage dmsg;
     216    dmsg.ttl = ttl;
     217    dmsg.endpoint = baseoverlay.getEndpointDescriptor();
    127218        {
    128219                // send predecessor discovery
    129220                OverlayMsg msg( typeDiscovery );
    130221                msg.setRegisterRelay(true);
    131                 Discovery dmsg( Discovery::predecessor, ttl,
    132                         baseoverlay.getEndpointDescriptor() );
    133                 msg.encapsulate(&dmsg);
     222
     223                // set type
     224            dmsg.type = DiscoveryMessage::predecessor;
     225
     226            // send
     227            msg.set_payload_message(dmsg.serialize());
    134228                send(&msg, link);
    135229        }
     
    137231                // send successor discovery
    138232                OverlayMsg msg( typeDiscovery );
    139                 msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() );
     233//              msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() );  // XXX this was redundand, wasn't it?
    140234                msg.setRegisterRelay(true);
    141                 Discovery dmsg( Discovery::successor, ttl,
    142                         baseoverlay.getEndpointDescriptor() );
    143                 msg.encapsulate(&dmsg);
     235
     236                // set type
     237        dmsg.type = DiscoveryMessage::successor;
     238
     239        // send
     240        msg.set_payload_message(dmsg.serialize());
    144241                send(&msg, link);
    145242        }
     
    163260
    164261        // timer for stabilization management
    165         Timer::setInterval(1000);
     262//      Timer::setInterval(1000);  // TODO find an appropriate interval!
     263        Timer::setInterval(10000);  // XXX testing...
    166264        Timer::start();
    167265}
     
    200298        return item->info;
    201299}
     300
     301std::vector<const LinkID*> Chord::getSortedLinkIdsTowardsNode(
     302    const NodeID& id, int num ) const
     303{
     304    std::vector<const LinkID*> ret;
     305   
     306    switch ( num )
     307    {
     308        // special case: just call »getNextLinkId«
     309        case 1:
     310        {
     311            ret.push_back(&getNextLinkId(id));
     312           
     313            break;
     314        }
     315       
     316        // * calculate top 2 *
     317        case 0:
     318        case 2:
     319        {
     320            std::vector<const route_item*> items = table->get_next_2_hops(id);
     321           
     322            ret.reserve(items.size());
     323           
     324            BOOST_FOREACH( const route_item* item, items )
     325            {
     326                ret.push_back(&item->info);
     327            }
     328           
     329            break;
     330        }
     331       
     332        // NOTE: implement real sorting, if needed (and handle "case 0" properly, then)
     333        default:
     334        {
     335            throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)");
     336           
     337            break;
     338        }
     339    }
     340   
     341    return ret;
     342}
     343
    202344
    203345/// @see OverlayInterface.h
     
    253395        if (remote==nodeid) {
    254396                logging_warn("dropping link that has been established to myself (nodes have same nodeid?)");
     397                logging_warn("NodeID: " << remote);
    255398                baseoverlay.dropLink(lnk);
    256399                return;
     
    290433/// @see CommunicationListener.h or @see OverlayInterface.h
    291434void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) {
    292         logging_debug("link_down: link=" << lnk.toString() << " remote=" <<
     435    // XXX logging_debug
     436        logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" <<
    293437                        remote.toString() );
    294438
     
    303447/// @see CommunicationListener.h
    304448/// @see OverlayInterface.h
    305 void Chord::onMessage(const DataMessage& msg, const NodeID& remote,
     449void Chord::onMessage(OverlayMsg* msg,
     450        reboost::shared_buffer_t sub_msg,
     451        const NodeID& remote,
    306452                const LinkID& link) {
    307453
    308         // decode message
    309         OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage());
    310         if (m == NULL) return;
    311 
    312454        // handle messages
    313         switch ((signalMessageTypes)m->getType()) {
     455        switch ((signalMessageTypes) msg->getType()) {
    314456
    315457        // discovery request
    316         case typeDiscovery: {
    317                 // decapsulate message
    318                 Discovery* dmsg = m->decapsulate<Discovery> ();
     458        case typeDiscovery:
     459        {
     460                // deserialize discovery message
     461                DiscoveryMessage dmsg;
     462                dmsg.deserialize(sub_msg);
     463               
    319464                logging_debug("Received discovery message with"
    320                             << " src=" << m->getSourceNode().toString()
    321                                 << " dst=" << m->getDestinationNode().toString()
    322                                 << " ttl=" << (int)dmsg->getTTL()
    323                                 << " type=" << (int)dmsg->getType()
     465                            << " src=" << msg->getSourceNode().toString()
     466                                << " dst=" << msg->getDestinationNode().toString()
     467                                << " ttl=" << (int)dmsg.ttl
     468                                << " type=" << (int)dmsg.type
    324469                );
    325470
     
    327472                bool found = false;
    328473                BOOST_FOREACH( NodeID& value, discovery )
    329                         if (value == m->getSourceNode()) {
     474                        if (value == msg->getSourceNode()) {
    330475                                found = true;
    331476                                break;
    332477                        }
    333                 if (!found) discovery.push_back(m->getSourceNode());
     478                if (!found) discovery.push_back(msg->getSourceNode());
    334479
    335480                // check if source node can be added to routing table and setup link
    336                 if (m->getSourceNode() != nodeid)
    337                         setup( dmsg->getEndpoint(), m->getSourceNode() );
     481                if (msg->getSourceNode() != nodeid)
     482                        setup( dmsg.endpoint, msg->getSourceNode() );
    338483
    339484                // process discovery message -------------------------- switch start --
    340                 switch (dmsg->getType()) {
    341 
    342                 // normal: route discovery message like every other message
    343                 case Discovery::normal: {
    344                         // closest node? yes-> split to follow successor and predecessor
    345                         if ( table->is_closest_to(m->getDestinationNode()) ) {
    346                                 logging_debug("Discovery split:");
    347                                 if (!table->get_successor()->isUnspecified()) {
    348                                         OverlayMsg omsg(*m);
    349                                         dmsg->setType(Discovery::successor);
    350                                         omsg.encapsulate(dmsg);
    351                                         logging_debug("* Routing to successor "
    352                                                         << table->get_successor()->toString() );
    353                                         baseoverlay.send( &omsg, *table->get_successor() );
    354                                 }
    355 
    356                                 // send predecessor message
    357                                 if (!table->get_predesessor()->isUnspecified()) {
    358                                         OverlayMsg omsg(*m);
    359                                         dmsg->setType(Discovery::predecessor);
    360                                         omsg.encapsulate(dmsg);
    361                                         logging_debug("* Routing to predecessor "
    362                                                         << table->get_predesessor()->toString() );
    363                                         baseoverlay.send( &omsg, *table->get_predesessor() );
    364                                 }
    365                         }
    366                         // no-> route message
    367                         else {
    368                                 baseoverlay.route( m );
    369                         }
    370                         break;
    371                 }
    372 
    373                 // successor mode: follow the successor until TTL is zero
    374                 case Discovery::successor:
    375                 case Discovery::predecessor: {
    376                         // reached destination? no->forward!
    377                         if (m->getDestinationNode() != nodeid) {
    378                                 OverlayMsg omsg(*m);
    379                                 omsg.encapsulate(dmsg);
    380                                 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
    381                                 baseoverlay.route( &omsg );
    382                                 break;
    383                         }
    384 
    385                         // time to live ended? yes-> stop routing
    386                         if (dmsg->getTTL() == 0 || dmsg->getTTL() > 10) break;
    387 
    388                         // decrease time-to-live
    389                         dmsg->setTTL(dmsg->getTTL() - 1);
    390 
    391                         const route_item* item = NULL;
    392                         if (dmsg->getType() == Discovery::successor &&
    393                                         table->get_successor() != NULL) {
    394                                 item = table->get(*table->get_successor());
    395                         } else {
    396                                 if (table->get_predesessor()!=NULL)
    397                                         item = table->get(*table->get_predesessor());
    398                         }
    399                         if (item == NULL)
    400                                 break;
    401 
    402                         logging_debug("Routing discovery message to succ/pred "
    403                                 << item->id.toString() );
    404                         OverlayMsg omsg(*m);
    405                         omsg.encapsulate(dmsg);
    406                         omsg.setDestinationNode(item->id);
    407                         omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
    408                         baseoverlay.send(&omsg, omsg.getDestinationNode());
    409                         break;
    410                 }
    411                 case Discovery::invalid:
    412                         break;
    413 
    414                 default:
    415                         break;
    416                 }
    417                 // process discovery message ---------------------------- switch end --
    418 
    419                 delete dmsg;
    420                 break;
    421         }
    422 
    423         // leave
    424         case typeLeave: {
    425                 if (link!=LinkID::UNSPECIFIED) {
    426                         route_item* item = table->get(remote);
    427                         if (item!=NULL) item->info = LinkID::UNSPECIFIED;
    428                         table->remove(remote);
    429                         baseoverlay.dropLink(link);
    430                 }
    431                 break;
    432         }}
     485                switch ( dmsg.type )
     486                {
     487            // normal: route discovery message like every other message
     488            case DiscoveryMessage::normal:
     489            {
     490                // closest node? yes-> split to follow successor and predecessor
     491                if ( table->is_closest_to(msg->getDestinationNode()) )
     492                {
     493                    logging_debug("Discovery split:");
     494                    if (!table->get_successor()->isUnspecified())
     495                    {
     496                        OverlayMsg omsg(*msg);
     497                       
     498                        dmsg.type = DiscoveryMessage::successor;
     499                        omsg.set_payload_message(dmsg.serialize());
     500
     501                        logging_debug("* Routing to successor "
     502                                << table->get_successor()->toString() );
     503                        send_node( &omsg, *table->get_successor() );
     504                    }
     505   
     506                    // send predecessor message
     507                    if (!table->get_predesessor()->isUnspecified())
     508                    {
     509                        OverlayMsg omsg(*msg);
     510                       
     511                        dmsg.type = DiscoveryMessage::predecessor;
     512                        omsg.set_payload_message(dmsg.serialize());
     513
     514                        logging_debug("* Routing to predecessor "
     515                                << table->get_predesessor()->toString() );
     516                        send_node( &omsg, *table->get_predesessor() );
     517                    }
     518                }
     519                // no-> route message
     520                else
     521                {
     522                    baseoverlay.route( msg );
     523                }
     524                break;
     525            }
     526   
     527            // successor mode: follow the successor until TTL is zero
     528            case DiscoveryMessage::successor:
     529            case DiscoveryMessage::predecessor:
     530            {
     531                // reached destination? no->forward!
     532                if (msg->getDestinationNode() != nodeid)
     533                {
     534                    OverlayMsg omsg(*msg);
     535                    omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
     536                   
     537                    omsg.set_payload_message(dmsg.serialize());
     538                   
     539                    baseoverlay.route( &omsg );
     540                    break;
     541                }
     542   
     543                // time to live ended? yes-> stop routing
     544                if (dmsg.ttl == 0 || dmsg.ttl > 10) break;
     545   
     546                // decrease time-to-live
     547                dmsg.ttl--;
     548   
     549                const route_item* item = NULL;
     550                if (dmsg.type == DiscoveryMessage::successor &&
     551                        table->get_successor() != NULL)
     552                {
     553                    item = table->get(*table->get_successor());
     554                }
     555                else if (table->get_predesessor() != NULL)
     556                {
     557                        item = table->get(*table->get_predesessor());
     558                }
     559                if (item == NULL)
     560                    break;
     561   
     562                logging_debug("Routing discovery message to succ/pred "
     563                    << item->id.toString() );
     564                OverlayMsg omsg(*msg);
     565                omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
     566                omsg.setDestinationNode(item->id);
     567               
     568                omsg.set_payload_message(dmsg.serialize());
     569               
     570                send_node( &omsg, omsg.getDestinationNode() );
     571                break;
     572            }
     573            case DiscoveryMessage::invalid:
     574                break;
     575   
     576            default:
     577                break;
     578            }
     579            // process discovery message ---------------------------- switch end --
     580   
     581            break;
     582        }
     583   
     584        // leave
     585        case typeLeave: {
     586            if (link!=LinkID::UNSPECIFIED) {
     587                route_item* item = table->get(remote);
     588                if (item!=NULL) item->info = LinkID::UNSPECIFIED;
     589                table->remove(remote);
     590                baseoverlay.dropLink(link);
     591            }
     592            break;
     593        }
     594        }
    433595}
    434596
Note: See TracChangeset for help on using the changeset viewer.