Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (11 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/Node.cpp

    r10653 r12060  
    3939#include "Node.h"
    4040
     41#include <boost/foreach.hpp>
     42
    4143#include "ariba/overlay/BaseOverlay.h"
     44#include "ariba/communication/BaseCommunication.h"
     45
    4246#include "ariba/utility/types/OverlayParameterSet.h"
    4347#include "ariba/communication/EndpointDescriptor.h"
    4448
     49#include <boost/property_tree/exceptions.hpp>
     50
     51using namespace std;
    4552using ariba::communication::EndpointDescriptor;
     53using boost::property_tree::ptree;
    4654
    4755namespace ariba {
    4856
    49 Node::Node(AribaModule& ariba_mod, const Name& node_name) :
    50         name(node_name), ariba_mod(ariba_mod) {
    51         base_overlay = new BaseOverlay();
     57//Node::Node(AribaModule& ariba_mod, const Name& node_name) :
     58//      name(node_name), ariba_mod(ariba_mod) {
     59//      base_overlay = new BaseOverlay();
     60//}
     61
     62Node::Node()  :
     63        name(Name::UNSPECIFIED),
     64        base_communication(NULL),
     65        base_overlay(NULL)
     66{
     67    base_communication = new BaseCommunication();
     68    base_overlay = new BaseOverlay();
    5269}
    5370
     
    5572        delete base_overlay;
    5673        base_overlay = NULL;
    57 }
    58 
    59 void Node::join(const Name& vnetname) {
    60         spovnetId = vnetname.toSpoVNetId();
    61         nodeId = generateNodeId(name);
    62 
    63         // start base comm if not started
    64         if( !ariba_mod.base_comm->isStarted() )
    65                 ariba_mod.base_comm->start();
    66 
    67         // start base overlay if not started
    68         // join against ourselfs
    69         if( !base_overlay->isStarted() )
    70                 base_overlay->start( *ariba_mod.base_comm, nodeId );
    71         base_overlay->joinSpoVNet( spovnetId );
    72 
    73         // join against static bootstrap points and
    74         // start automatic bootstrapping modules
    75         vector<AribaModule::BootstrapMechanism> mechanisms
    76                 = ariba_mod.getBootstrapMechanisms(vnetname);
    77 
    78         vector<pair<BootstrapManager::BootstrapType,string> > internalmodules;
    79 
    80         BOOST_FOREACH(AribaModule::BootstrapMechanism m, mechanisms){
    81                 switch(m){
    82                         case AribaModule::BootstrapMechanismStatic:
    83                         {
    84                                 const communication::EndpointDescriptor* ep =
    85                                                         ariba_mod.getBootstrapNode(vnetname, m);
    86                                         if( ep != NULL && ep->isUnspecified() == false )
    87                                                 base_overlay->joinSpoVNet( spovnetId, *ep);
    88                                 break;
    89                         }
    90                         case AribaModule::BootstrapMechanismBroadcast:
    91                                 internalmodules.push_back(make_pair(
    92                                                 BootstrapManager::BootstrapTypePeriodicBroadcast,
    93                                                 ariba_mod.getBootstrapInfo(vnetname, m)));
    94                                 break;
    95                         case AribaModule::BootstrapMechanismMulticastDNS:
    96                                 internalmodules.push_back(make_pair(
    97                                                 BootstrapManager::BootstrapTypeMulticastDns,
    98                                                 ariba_mod.getBootstrapInfo(vnetname, m)));
    99                                 break;
    100                         case AribaModule::BootstrapMechanismSDP:
    101                                 internalmodules.push_back(make_pair(
    102                                                 BootstrapManager::BootstrapTypeBluetoothSdp,
    103                                                 ariba_mod.getBootstrapInfo(vnetname, m)));
    104                                 break;
    105                         default:
    106                                 break;
    107                 }
    108         }
    109 
    110         // start automatic overlay bootstrapping modules
    111         base_overlay->startBootstrapModules(internalmodules);
    112 
    113         // done
    114 }
    115 
    116 void Node::initiate(const Name& vnetname, const SpoVNetProperties& parm) {
    117         utility::OverlayParameterSet ovrpset;
    118         ovrpset.setOverlayStructure(
    119                         (utility::OverlayParameterSet::_OverlayStructure)
    120                         parm.getBaseOverlayType()
    121                         );
    122 
    123         spovnetId = vnetname.toSpoVNetId();
    124         nodeId = generateNodeId(name);
    125 
    126         // start base comm if not started
    127         if( !ariba_mod.base_comm->isStarted() )
    128                 ariba_mod.base_comm->start();
    129 
    130         // start base overlay if not started
    131         if( !base_overlay->isStarted() )
    132                 base_overlay->start( *ariba_mod.base_comm, nodeId );
    133 
    134         base_overlay->createSpoVNet( spovnetId, ovrpset );
    135 }
     74       
     75        delete base_communication;
     76        base_communication = NULL;
     77}
     78
     79void Node::connect(const ptree& config)
     80{
     81    using namespace boost::property_tree;
     82    using namespace addressing2;
     83   
     84    assert( ! base_communication->isStarted() );
     85    assert( ! base_overlay->isStarted() );
     86
     87    // XXX needed since »empty_ptree<ptree>()« is not working
     88    //   ---> see: http://stackoverflow.com/questions/5003549/where-is-boost-property-treeempty-ptree
     89    static const ptree empty_pt;
     90   
     91    // SpovNet ID
     92    Name spovnet_name(config.get<string>("spovnet_name"));
     93    spovnetId = spovnet_name.toSpoVNetId();
     94   
     95    // Node ID
     96    try
     97    {
     98        name = config.get<string>("node_name");
     99    }
     100    catch ( ptree_bad_path& e )
     101    {
     102        name = Name::UNSPECIFIED;
     103    }
     104    nodeId = generateNodeId(name);
     105
     106   
     107   
     108    /* Base Communication */
     109    EndpointSetPtr listen_on;
     110    try
     111    {
     112        listen_on = endpoint_set::create_EndpointSet(
     113            config.get_child("listen_on"));
     114    }
     115    catch ( ptree_bad_path& e )
     116    {
     117        /* no endpoints specified, using default: »[::]:41322+« */
     118       
     119        ptree default_listen_on;
     120        default_listen_on.put("endp.category", "TCPIP");
     121        default_listen_on.put("endp.addr", "::");
     122        default_listen_on.put("endp.port", 0);  // defaults to 41322 (or higher)
     123       
     124        listen_on = endpoint_set::create_EndpointSet(default_listen_on);
     125//        logging_warn("No endpoints specified in config. ---> Using default.");
     126        cout << "No endpoints specified in config. ---> Using default." << endl;
     127    }
     128    base_communication->start(listen_on);
     129   
     130    // TODO maybe notify the upper layer whether we have any active endpoints
     131   
     132
     133    /* Base Overlay */
     134    base_overlay->start( base_communication, nodeId );
     135
     136    base_overlay->createSpoVNet( spovnetId );
     137    base_overlay->joinSpoVNet( spovnetId );
     138   
     139   
     140   
     141    /* Bootstrap */
     142    const ptree& bootstrap_pt = config.get_child("bootstrap", empty_pt);
     143   
     144    // Static Bootstrap
     145    try
     146    {
     147        // read endpoint_set from config
     148        EndpointSetPtr ep_set = endpoint_set::create_EndpointSet(
     149                bootstrap_pt.get_child("direct"));
     150       
     151        EndpointDescriptor ep = EndpointDescriptor::UNSPECIFIED();
     152        ep.replace_endpoint_set(ep_set);
     153       
     154        // try to connect
     155        base_overlay->joinSpoVNet( spovnetId, ep);
     156    }
     157    catch ( ptree_bad_path& e )
     158    {
     159//        logging_info("No direct bootstrap info in config.");
     160        cout << "No direct bootstrap info in config." << endl;
     161    }
     162
     163   
     164    /* Bootstrap modules */
     165    vector<pair<BootstrapManager::BootstrapType,string> > internalmodules;
     166   
     167    // Bootstrap: Broadcast
     168    if ( bootstrap_pt.get("broadcast", false) )
     169    {
     170        internalmodules.push_back(make_pair(
     171                BootstrapManager::BootstrapTypePeriodicBroadcast,""));
     172    }
     173   
     174    // Bootstrap: MDNS
     175    if ( bootstrap_pt.get("mdns", false) )
     176    {
     177        internalmodules.push_back(make_pair(
     178                BootstrapManager::BootstrapTypeMulticastDns,""));
     179    }
     180
     181    // Bootstrap: SDP
     182    if ( bootstrap_pt.get("sdp", false) )
     183    {
     184        internalmodules.push_back(make_pair(
     185                BootstrapManager::BootstrapTypeBluetoothSdp,""));
     186    }
     187
     188    // start automatic overlay bootstrapping modules
     189    base_overlay->startBootstrapModules(internalmodules);
     190}
     191
     192//void Node::join(const Name& vnetname) {
     193//      spovnetId = vnetname.toSpoVNetId();
     194//      nodeId = generateNodeId(name);
     195//
     196//      // start base comm if not started
     197//      if( !ariba_mod.base_comm->isStarted() )
     198//              ariba_mod.base_comm->start();
     199//
     200//      // start base overlay if not started
     201//      // join against ourselfs
     202//      if( !base_overlay->isStarted() )
     203//              base_overlay->start( *ariba_mod.base_comm, nodeId );
     204//      base_overlay->joinSpoVNet( spovnetId );
     205//
     206//      // join against static bootstrap points and
     207//      // start automatic bootstrapping modules
     208//      vector<AribaModule::BootstrapMechanism> mechanisms
     209//              = ariba_mod.getBootstrapMechanisms(vnetname);
     210//
     211//      vector<pair<BootstrapManager::BootstrapType,string> > internalmodules;
     212//
     213//      BOOST_FOREACH(AribaModule::BootstrapMechanism m, mechanisms){
     214//              switch(m){
     215//                      case AribaModule::BootstrapMechanismStatic:
     216//                      {
     217//                              const communication::EndpointDescriptor* ep =
     218//                                                      ariba_mod.getBootstrapNode(vnetname, m);
     219//                                      if( ep != NULL && ep->isUnspecified() == false )
     220//                                              base_overlay->joinSpoVNet( spovnetId, *ep);
     221//                              break;
     222//                      }
     223//                      case AribaModule::BootstrapMechanismBroadcast:
     224//                              internalmodules.push_back(make_pair(
     225//                                              BootstrapManager::BootstrapTypePeriodicBroadcast,
     226//                                              ariba_mod.getBootstrapInfo(vnetname, m)));
     227//                              break;
     228//                      case AribaModule::BootstrapMechanismMulticastDNS:
     229//                              internalmodules.push_back(make_pair(
     230//                                              BootstrapManager::BootstrapTypeMulticastDns,
     231//                                              ariba_mod.getBootstrapInfo(vnetname, m)));
     232//                              break;
     233//                      case AribaModule::BootstrapMechanismSDP:
     234//                              internalmodules.push_back(make_pair(
     235//                                              BootstrapManager::BootstrapTypeBluetoothSdp,
     236//                                              ariba_mod.getBootstrapInfo(vnetname, m)));
     237//                              break;
     238//                      default:
     239//                              break;
     240//              }
     241//      }
     242//
     243//      // start automatic overlay bootstrapping modules
     244//      base_overlay->startBootstrapModules(internalmodules);
     245//
     246//      // done
     247//}
     248//
     249//void Node::initiate(const Name& vnetname, const SpoVNetProperties& parm) {
     250//      utility::OverlayParameterSet ovrpset;
     251//      ovrpset.setOverlayStructure(
     252//                      (utility::OverlayParameterSet::_OverlayStructure)
     253//                      parm.getBaseOverlayType()
     254//                      );
     255//
     256//      spovnetId = vnetname.toSpoVNetId();
     257//      nodeId = generateNodeId(name);
     258//
     259//      // start base comm if not started
     260//      if( !ariba_mod.base_comm->isStarted() )
     261//              ariba_mod.base_comm->start();
     262//
     263//      // start base overlay if not started
     264//      if( !base_overlay->isStarted() )
     265//              base_overlay->start( *ariba_mod.base_comm, nodeId );
     266//
     267//      base_overlay->createSpoVNet( spovnetId, ovrpset );
     268//}
    136269
    137270void Node::leave() {
    138271        base_overlay->stopBootstrapModules();
    139272        base_overlay->leaveSpoVNet();
    140         ariba_mod.base_comm->stop();
     273        base_communication->stop();  // XXX before »base_overlay->stop()« ??
    141274        base_overlay->stop();
    142275}
     
    172305}
    173306
     307bool Node::isLinkDirect(const ariba::LinkID& lnk) const
     308{
     309    return base_overlay->isLinkDirect(lnk);
     310}
     311
     312int Node::getHopCount(const ariba::LinkID& lnk) const
     313{
     314    return base_overlay->getHopCount(lnk);
     315}
     316
     317
     318
     319
     320/* +++++ Message sending +++++ */
     321void Node::check_send_priority(uint8_t priority)
     322{
     323    if ( priority < send_priority::HIGHEST || priority > send_priority::LOWEST )
     324        throw std::invalid_argument("Illegal priority");     
     325}
     326
     327
     328// +++ new interface +++
     329const SequenceNumber& Node::sendMessage(reboost::message_t msg, const LinkID& lnk, uint8_t priority)
     330{
     331    // check priority
     332    check_send_priority(priority);
     333       
     334    // * call base overlay *
     335    return base_overlay->sendMessage(msg, lnk, priority);
     336}
     337
     338// +++ legacy interface +++
     339seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk)
     340{
     341    reboost::message_t message = ((Message*) msg)->wrap_up_for_sending();
     342   
     343    try
     344    {
     345        base_overlay->sendMessage(message, lnk, send_priority::NORMAL);
     346        return 0;
     347    }
     348    catch ( ariba::overlay::message_not_sent& e )
     349    {
     350        logging_warn("Message could not be sent. Dropped.");
     351        return -1;
     352    }
     353}
     354
     355
     356// +++ new interface +++
     357const SequenceNumber& Node::sendMessage(reboost::message_t msg, const NodeID& nid,
     358        const ServiceID& sid, uint8_t priority, const LinkProperties& req) {
     359
     360    // check priority
     361    check_send_priority(priority);
     362       
     363    // * call base overlay *
     364    return base_overlay->sendMessage(msg, nid, priority, sid);
     365}
     366
     367// +++ legacy interface +++
    174368seqnum_t Node::sendMessage(const DataMessage& msg, const NodeID& nid,
    175369                const ServiceID& sid, const LinkProperties& req) {
    176         return base_overlay->sendMessage((Message*) msg, nid, sid);
    177 }
    178 
     370   
     371//    reboost::message_t message = ((Message*) msg)->wrap_up_for_sending();
     372    reboost::message_t message;
     373    message.push_back( ((Message*) msg)->serialize_into_shared_buffer() );
     374   
     375    try
     376    {
     377        sendMessage(message, nid, sid, send_priority::NORMAL, req);
     378        return 0;
     379    }
     380    catch ( ariba::overlay::message_not_sent& e )
     381    {
     382        logging_warn("Message could not be sent. Dropped.");
     383        return -1;
     384    }
     385}
     386
     387
     388// +++ new interface +++
     389NodeID Node::sendMessageCloserToNodeID(reboost::message_t msg, const NodeID& nid, const ServiceID& sid,
     390        uint8_t priority, const LinkProperties& req) {
     391
     392    // check priority
     393    check_send_priority(priority);
     394       
     395    // * call base overlay *
     396    return base_overlay->sendMessageCloserToNodeID(msg, nid, priority, sid);
     397}
     398
     399// +++ legacy interface +++
    179400NodeID Node::sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid,
    180401        const LinkProperties& req) {
    181402   
    182     return base_overlay->sendMessageCloserToNodeID((Message*) msg, nid, sid);
    183 }
    184 
    185 
    186 seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk) {
    187         return base_overlay->sendMessage((Message*) msg, lnk);
    188 }
    189 
     403    reboost::message_t message = ((Message*) msg)->wrap_up_for_sending();
     404   
     405    return sendMessageCloserToNodeID(message, nid, sid, send_priority::NORMAL, req);
     406}
     407
     408
     409// +++ new interface +++
     410void Node::sendBroadcastMessage(reboost::message_t msg, const ServiceID& sid, uint8_t priority) {
     411
     412    // check priority
     413    check_send_priority(priority);
     414       
     415    // * call base overlay *
     416    return base_overlay->broadcastMessage(msg, sid, priority);
     417}
     418
     419// +++ legacy interface +++
    190420void Node::sendBroadcastMessage(const DataMessage& msg, const ServiceID& sid) {
    191         return base_overlay->broadcastMessage((Message*)msg, sid);
    192 }
     421    reboost::message_t message = ((Message*) msg)->wrap_up_for_sending();
     422
     423        return sendBroadcastMessage(message, sid);
     424}
     425
     426
     427/* +++++ [Message sending] +++++ */
     428
     429
     430
    193431
    194432bool Node::bind(NodeListener* listener) {
     
    204442        bool ret = base_overlay->bind(listener, sid);
    205443
    206         // now that we have a listener, we can ask if sniffing is ok
    207         if( ariba_mod.sideport_sniffer != NULL ){
    208                 base_overlay->registerSidePort(ariba_mod.sideport_sniffer);
    209         }
     444//      // now that we have a listener, we can ask if sniffing is ok
     445//      if( ariba_mod.sideport_sniffer != NULL ){
     446//              base_overlay->registerSidePort(ariba_mod.sideport_sniffer);
     447//      }
    210448
    211449        return ret;
Note: See TracChangeset for help on using the changeset viewer.