Changeset 12060 for source


Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (12 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

Location:
source
Files:
18 added
11 deleted
73 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/CMakeLists.txt

    r11885 r12060  
    4646### Increment this whenever the interface changes! ###
    4747######################################################
    48 set(ariba_SOVERSION 1)
     48set(ariba_SOVERSION 2)
    4949######################################################
    5050
     
    184184add_headers(
    185185    ariba.h
    186     AribaModule.h
     186#    AribaModule.h
    187187    CommunicationListener.h
    188188    DataMessage.h
     
    199199
    200200add_sources(
    201     AribaModule.cpp
     201#    AribaModule.cpp
    202202    CommunicationListener.cpp
    203203    DataMessage.cpp
  • source/ariba/CommunicationListener.cpp

    r7468 r12060  
    5959
    6060void CommunicationListener::onLinkFail(const LinkID& l, const NodeID& r) {
     61    onLinkDown(l, r);
    6162}
    6263
     
    6970}
    7071
     72// this implementation provides backward compatibility
     73// overwrite it to use the new interface
     74void CommunicationListener::onMessage(reboost::shared_buffer_t message,
     75        const NodeID& remote,
     76        const LinkID& lnk,
     77        const SequenceNumber& seqnum,
     78        const ariba::overlay::OverlayMsg* overlay_msg)
     79{
     80    // copy data
     81    Data data;
     82    data.setLength(message.size() * 8);
     83    memcpy(data.getBuffer(), message.data(), message.size());
     84   
     85    // and prepare old-style overlay message
     86    Message legacy_msg;
     87    legacy_msg.setPayload(data);
     88   
     89   
     90    // * call legacy handler *
     91    this->onMessage(legacy_msg, remote, lnk);
     92}
     93
    7194void CommunicationListener::onKeyValue( const Data& key, const vector<Data>& value ) {
    7295}
    7396
     97bool CommunicationListener::onPing(const NodeID& remote)
     98{
     99    return true;
     100}
     101
     102void CommunicationListener::onPingLost(const NodeID& remote)
     103{
     104}
     105
     106void CommunicationListener::onPong(const NodeID& remote)
     107{
     108}
     109
    74110} // namespace ariba
  • source/ariba/CommunicationListener.h

    r9684 r12060  
    4444#include "LinkProperties.h"
    4545#include "DataMessage.h"
     46#include "ariba/overlay/SequenceNumber.h"
    4647
    4748namespace ariba {
     49
     50typedef ariba::overlay::SequenceNumber SequenceNumber;
    4851
    4952// forward decl
    5053namespace overlay {
    5154        class BaseOverlay;
     55        class OverlayMsg;
    5256}
    5357
     
    116120
    117121        /**
     122         * @DEPRECATED
     123         *
    118124         * Called when a message is incoming
    119125         * @param msg The data message that is received
     
    123129        virtual void onMessage(const DataMessage& msg, const NodeID& remote,
    124130                        const LinkID& lnk = LinkID::UNSPECIFIED);
     131
     132
     133    /**
     134     * NOTE: This interface is still unstable and may change in future releases.
     135     *   ---> Especially the parameter «overlay_msg» may be replaced or removed.
     136     *   
     137     * Called when a message is incoming
     138     * @param msg A shared buffer with the (serialized) payload
     139     * @param remote The remote node that sent the message
     140     * @param lnk The link id of the link where the message is received
     141     * @param overlay_msg A pointer to the ariba internal Overlay Message
     142     */
     143    virtual void onMessage(reboost::shared_buffer_t message,
     144            const NodeID& remote,
     145            const LinkID& lnk,
     146            const SequenceNumber& seqnum,
     147            const ariba::overlay::OverlayMsg* overlay_msg);
    125148
    126149        // --- dht functionality ---
     
    133156        virtual void onKeyValue( const Data& key, const vector<Data>& value );
    134157
     158    // --- ping pong (routed) ---
     159    /**
     160     * Called when a ping message was received.
     161     *
     162     * @param remote Node which sent the ping message
     163     * @return bool Allows to send a pong message (default: true)
     164     */
     165    virtual bool onPing(const NodeID& remote);
     166
     167    /**
     168     * Called when a ping message is lost.
     169     *
     170     * NOTE: A ping message could also get lost without notice.
     171     *
     172     * @param remote Node to which the ping message was sent
     173     */
     174    virtual void onPingLost(const NodeID& remote);
     175   
     176    /**
     177     * Called when a pong message was received.
     178     *
     179     * @param remote Node which sent the pong message
     180     */
     181    virtual void onPong(const NodeID& remote);
    135182};
    136183
  • source/ariba/DataMessage.h

    r9684 r12060  
    99// use message utility
    1010#ifdef USE_MESSAGE_UTILITY
    11   #include "ariba/utility/messages.h"
     11//  #include "ariba/utility/messages.h"
     12  #include "ariba/utility/messages/Message.h"
    1213  namespace ariba {
    1314    typedef utility::Message Message;
  • source/ariba/Message.h

    r9684 r12060  
    4141
    4242#include <inttypes.h>
    43 #include "ariba/utility/messages.h"
     43
     44// legacy messages
     45#include "ariba/utility/messages/Message.h"
     46// reboost messages
     47#include "ariba/utility/transport/messages/message.hpp"
    4448
    4549/** \addtogroup public
  • source/ariba/Node.cpp

    r10653 r12060  
    3939#include "Node.h"
    4040
     41#include <boost/foreach.hpp>
     42
    4143#include "ariba/overlay/BaseOverlay.h"
     44#include "ariba/communication/BaseCommunication.h"
     45
    4246#include "ariba/utility/types/OverlayParameterSet.h"
    4347#include "ariba/communication/EndpointDescriptor.h"
    4448
     49#include <boost/property_tree/exceptions.hpp>
     50
     51using namespace std;
    4552using ariba::communication::EndpointDescriptor;
     53using boost::property_tree::ptree;
    4654
    4755namespace ariba {
    4856
    49 Node::Node(AribaModule& ariba_mod, const Name& node_name) :
    50         name(node_name), ariba_mod(ariba_mod) {
    51         base_overlay = new BaseOverlay();
     57//Node::Node(AribaModule& ariba_mod, const Name& node_name) :
     58//      name(node_name), ariba_mod(ariba_mod) {
     59//      base_overlay = new BaseOverlay();
     60//}
     61
     62Node::Node()  :
     63        name(Name::UNSPECIFIED),
     64        base_communication(NULL),
     65        base_overlay(NULL)
     66{
     67    base_communication = new BaseCommunication();
     68    base_overlay = new BaseOverlay();
    5269}
    5370
     
    5572        delete base_overlay;
    5673        base_overlay = NULL;
    57 }
    58 
    59 void Node::join(const Name& vnetname) {
    60         spovnetId = vnetname.toSpoVNetId();
    61         nodeId = generateNodeId(name);
    62 
    63         // start base comm if not started
    64         if( !ariba_mod.base_comm->isStarted() )
    65                 ariba_mod.base_comm->start();
    66 
    67         // start base overlay if not started
    68         // join against ourselfs
    69         if( !base_overlay->isStarted() )
    70                 base_overlay->start( *ariba_mod.base_comm, nodeId );
    71         base_overlay->joinSpoVNet( spovnetId );
    72 
    73         // join against static bootstrap points and
    74         // start automatic bootstrapping modules
    75         vector<AribaModule::BootstrapMechanism> mechanisms
    76                 = ariba_mod.getBootstrapMechanisms(vnetname);
    77 
    78         vector<pair<BootstrapManager::BootstrapType,string> > internalmodules;
    79 
    80         BOOST_FOREACH(AribaModule::BootstrapMechanism m, mechanisms){
    81                 switch(m){
    82                         case AribaModule::BootstrapMechanismStatic:
    83                         {
    84                                 const communication::EndpointDescriptor* ep =
    85                                                         ariba_mod.getBootstrapNode(vnetname, m);
    86                                         if( ep != NULL && ep->isUnspecified() == false )
    87                                                 base_overlay->joinSpoVNet( spovnetId, *ep);
    88                                 break;
    89                         }
    90                         case AribaModule::BootstrapMechanismBroadcast:
    91                                 internalmodules.push_back(make_pair(
    92                                                 BootstrapManager::BootstrapTypePeriodicBroadcast,
    93                                                 ariba_mod.getBootstrapInfo(vnetname, m)));
    94                                 break;
    95                         case AribaModule::BootstrapMechanismMulticastDNS:
    96                                 internalmodules.push_back(make_pair(
    97                                                 BootstrapManager::BootstrapTypeMulticastDns,
    98                                                 ariba_mod.getBootstrapInfo(vnetname, m)));
    99                                 break;
    100                         case AribaModule::BootstrapMechanismSDP:
    101                                 internalmodules.push_back(make_pair(
    102                                                 BootstrapManager::BootstrapTypeBluetoothSdp,
    103                                                 ariba_mod.getBootstrapInfo(vnetname, m)));
    104                                 break;
    105                         default:
    106                                 break;
    107                 }
    108         }
    109 
    110         // start automatic overlay bootstrapping modules
    111         base_overlay->startBootstrapModules(internalmodules);
    112 
    113         // done
    114 }
    115 
    116 void Node::initiate(const Name& vnetname, const SpoVNetProperties& parm) {
    117         utility::OverlayParameterSet ovrpset;
    118         ovrpset.setOverlayStructure(
    119                         (utility::OverlayParameterSet::_OverlayStructure)
    120                         parm.getBaseOverlayType()
    121                         );
    122 
    123         spovnetId = vnetname.toSpoVNetId();
    124         nodeId = generateNodeId(name);
    125 
    126         // start base comm if not started
    127         if( !ariba_mod.base_comm->isStarted() )
    128                 ariba_mod.base_comm->start();
    129 
    130         // start base overlay if not started
    131         if( !base_overlay->isStarted() )
    132                 base_overlay->start( *ariba_mod.base_comm, nodeId );
    133 
    134         base_overlay->createSpoVNet( spovnetId, ovrpset );
    135 }
     74       
     75        delete base_communication;
     76        base_communication = NULL;
     77}
     78
     79void Node::connect(const ptree& config)
     80{
     81    using namespace boost::property_tree;
     82    using namespace addressing2;
     83   
     84    assert( ! base_communication->isStarted() );
     85    assert( ! base_overlay->isStarted() );
     86
     87    // XXX needed since »empty_ptree<ptree>()« is not working
     88    //   ---> see: http://stackoverflow.com/questions/5003549/where-is-boost-property-treeempty-ptree
     89    static const ptree empty_pt;
     90   
     91    // SpovNet ID
     92    Name spovnet_name(config.get<string>("spovnet_name"));
     93    spovnetId = spovnet_name.toSpoVNetId();
     94   
     95    // Node ID
     96    try
     97    {
     98        name = config.get<string>("node_name");
     99    }
     100    catch ( ptree_bad_path& e )
     101    {
     102        name = Name::UNSPECIFIED;
     103    }
     104    nodeId = generateNodeId(name);
     105
     106   
     107   
     108    /* Base Communication */
     109    EndpointSetPtr listen_on;
     110    try
     111    {
     112        listen_on = endpoint_set::create_EndpointSet(
     113            config.get_child("listen_on"));
     114    }
     115    catch ( ptree_bad_path& e )
     116    {
     117        /* no endpoints specified, using default: »[::]:41322+« */
     118       
     119        ptree default_listen_on;
     120        default_listen_on.put("endp.category", "TCPIP");
     121        default_listen_on.put("endp.addr", "::");
     122        default_listen_on.put("endp.port", 0);  // defaults to 41322 (or higher)
     123       
     124        listen_on = endpoint_set::create_EndpointSet(default_listen_on);
     125//        logging_warn("No endpoints specified in config. ---> Using default.");
     126        cout << "No endpoints specified in config. ---> Using default." << endl;
     127    }
     128    base_communication->start(listen_on);
     129   
     130    // TODO maybe notify the upper layer whether we have any active endpoints
     131   
     132
     133    /* Base Overlay */
     134    base_overlay->start( base_communication, nodeId );
     135
     136    base_overlay->createSpoVNet( spovnetId );
     137    base_overlay->joinSpoVNet( spovnetId );
     138   
     139   
     140   
     141    /* Bootstrap */
     142    const ptree& bootstrap_pt = config.get_child("bootstrap", empty_pt);
     143   
     144    // Static Bootstrap
     145    try
     146    {
     147        // read endpoint_set from config
     148        EndpointSetPtr ep_set = endpoint_set::create_EndpointSet(
     149                bootstrap_pt.get_child("direct"));
     150       
     151        EndpointDescriptor ep = EndpointDescriptor::UNSPECIFIED();
     152        ep.replace_endpoint_set(ep_set);
     153       
     154        // try to connect
     155        base_overlay->joinSpoVNet( spovnetId, ep);
     156    }
     157    catch ( ptree_bad_path& e )
     158    {
     159//        logging_info("No direct bootstrap info in config.");
     160        cout << "No direct bootstrap info in config." << endl;
     161    }
     162
     163   
     164    /* Bootstrap modules */
     165    vector<pair<BootstrapManager::BootstrapType,string> > internalmodules;
     166   
     167    // Bootstrap: Broadcast
     168    if ( bootstrap_pt.get("broadcast", false) )
     169    {
     170        internalmodules.push_back(make_pair(
     171                BootstrapManager::BootstrapTypePeriodicBroadcast,""));
     172    }
     173   
     174    // Bootstrap: MDNS
     175    if ( bootstrap_pt.get("mdns", false) )
     176    {
     177        internalmodules.push_back(make_pair(
     178                BootstrapManager::BootstrapTypeMulticastDns,""));
     179    }
     180
     181    // Bootstrap: SDP
     182    if ( bootstrap_pt.get("sdp", false) )
     183    {
     184        internalmodules.push_back(make_pair(
     185                BootstrapManager::BootstrapTypeBluetoothSdp,""));
     186    }
     187
     188    // start automatic overlay bootstrapping modules
     189    base_overlay->startBootstrapModules(internalmodules);
     190}
     191
     192//void Node::join(const Name& vnetname) {
     193//      spovnetId = vnetname.toSpoVNetId();
     194//      nodeId = generateNodeId(name);
     195//
     196//      // start base comm if not started
     197//      if( !ariba_mod.base_comm->isStarted() )
     198//              ariba_mod.base_comm->start();
     199//
     200//      // start base overlay if not started
     201//      // join against ourselfs
     202//      if( !base_overlay->isStarted() )
     203//              base_overlay->start( *ariba_mod.base_comm, nodeId );
     204//      base_overlay->joinSpoVNet( spovnetId );
     205//
     206//      // join against static bootstrap points and
     207//      // start automatic bootstrapping modules
     208//      vector<AribaModule::BootstrapMechanism> mechanisms
     209//              = ariba_mod.getBootstrapMechanisms(vnetname);
     210//
     211//      vector<pair<BootstrapManager::BootstrapType,string> > internalmodules;
     212//
     213//      BOOST_FOREACH(AribaModule::BootstrapMechanism m, mechanisms){
     214//              switch(m){
     215//                      case AribaModule::BootstrapMechanismStatic:
     216//                      {
     217//                              const communication::EndpointDescriptor* ep =
     218//                                                      ariba_mod.getBootstrapNode(vnetname, m);
     219//                                      if( ep != NULL && ep->isUnspecified() == false )
     220//                                              base_overlay->joinSpoVNet( spovnetId, *ep);
     221//                              break;
     222//                      }
     223//                      case AribaModule::BootstrapMechanismBroadcast:
     224//                              internalmodules.push_back(make_pair(
     225//                                              BootstrapManager::BootstrapTypePeriodicBroadcast,
     226//                                              ariba_mod.getBootstrapInfo(vnetname, m)));
     227//                              break;
     228//                      case AribaModule::BootstrapMechanismMulticastDNS:
     229//                              internalmodules.push_back(make_pair(
     230//                                              BootstrapManager::BootstrapTypeMulticastDns,
     231//                                              ariba_mod.getBootstrapInfo(vnetname, m)));
     232//                              break;
     233//                      case AribaModule::BootstrapMechanismSDP:
     234//                              internalmodules.push_back(make_pair(
     235//                                              BootstrapManager::BootstrapTypeBluetoothSdp,
     236//                                              ariba_mod.getBootstrapInfo(vnetname, m)));
     237//                              break;
     238//                      default:
     239//                              break;
     240//              }
     241//      }
     242//
     243//      // start automatic overlay bootstrapping modules
     244//      base_overlay->startBootstrapModules(internalmodules);
     245//
     246//      // done
     247//}
     248//
     249//void Node::initiate(const Name& vnetname, const SpoVNetProperties& parm) {
     250//      utility::OverlayParameterSet ovrpset;
     251//      ovrpset.setOverlayStructure(
     252//                      (utility::OverlayParameterSet::_OverlayStructure)
     253//                      parm.getBaseOverlayType()
     254//                      );
     255//
     256//      spovnetId = vnetname.toSpoVNetId();
     257//      nodeId = generateNodeId(name);
     258//
     259//      // start base comm if not started
     260//      if( !ariba_mod.base_comm->isStarted() )
     261//              ariba_mod.base_comm->start();
     262//
     263//      // start base overlay if not started
     264//      if( !base_overlay->isStarted() )
     265//              base_overlay->start( *ariba_mod.base_comm, nodeId );
     266//
     267//      base_overlay->createSpoVNet( spovnetId, ovrpset );
     268//}
    136269
    137270void Node::leave() {
    138271        base_overlay->stopBootstrapModules();
    139272        base_overlay->leaveSpoVNet();
    140         ariba_mod.base_comm->stop();
     273        base_communication->stop();  // XXX before »base_overlay->stop()« ??
    141274        base_overlay->stop();
    142275}
     
    172305}
    173306
     307bool Node::isLinkDirect(const ariba::LinkID& lnk) const
     308{
     309    return base_overlay->isLinkDirect(lnk);
     310}
     311
     312int Node::getHopCount(const ariba::LinkID& lnk) const
     313{
     314    return base_overlay->getHopCount(lnk);
     315}
     316
     317
     318
     319
     320/* +++++ Message sending +++++ */
     321void Node::check_send_priority(uint8_t priority)
     322{
     323    if ( priority < send_priority::HIGHEST || priority > send_priority::LOWEST )
     324        throw std::invalid_argument("Illegal priority");     
     325}
     326
     327
     328// +++ new interface +++
     329const SequenceNumber& Node::sendMessage(reboost::message_t msg, const LinkID& lnk, uint8_t priority)
     330{
     331    // check priority
     332    check_send_priority(priority);
     333       
     334    // * call base overlay *
     335    return base_overlay->sendMessage(msg, lnk, priority);
     336}
     337
     338// +++ legacy interface +++
     339seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk)
     340{
     341    reboost::message_t message = ((Message*) msg)->wrap_up_for_sending();
     342   
     343    try
     344    {
     345        base_overlay->sendMessage(message, lnk, send_priority::NORMAL);
     346        return 0;
     347    }
     348    catch ( ariba::overlay::message_not_sent& e )
     349    {
     350        logging_warn("Message could not be sent. Dropped.");
     351        return -1;
     352    }
     353}
     354
     355
     356// +++ new interface +++
     357const SequenceNumber& Node::sendMessage(reboost::message_t msg, const NodeID& nid,
     358        const ServiceID& sid, uint8_t priority, const LinkProperties& req) {
     359
     360    // check priority
     361    check_send_priority(priority);
     362       
     363    // * call base overlay *
     364    return base_overlay->sendMessage(msg, nid, priority, sid);
     365}
     366
     367// +++ legacy interface +++
    174368seqnum_t Node::sendMessage(const DataMessage& msg, const NodeID& nid,
    175369                const ServiceID& sid, const LinkProperties& req) {
    176         return base_overlay->sendMessage((Message*) msg, nid, sid);
    177 }
    178 
     370   
     371//    reboost::message_t message = ((Message*) msg)->wrap_up_for_sending();
     372    reboost::message_t message;
     373    message.push_back( ((Message*) msg)->serialize_into_shared_buffer() );
     374   
     375    try
     376    {
     377        sendMessage(message, nid, sid, send_priority::NORMAL, req);
     378        return 0;
     379    }
     380    catch ( ariba::overlay::message_not_sent& e )
     381    {
     382        logging_warn("Message could not be sent. Dropped.");
     383        return -1;
     384    }
     385}
     386
     387
     388// +++ new interface +++
     389NodeID Node::sendMessageCloserToNodeID(reboost::message_t msg, const NodeID& nid, const ServiceID& sid,
     390        uint8_t priority, const LinkProperties& req) {
     391
     392    // check priority
     393    check_send_priority(priority);
     394       
     395    // * call base overlay *
     396    return base_overlay->sendMessageCloserToNodeID(msg, nid, priority, sid);
     397}
     398
     399// +++ legacy interface +++
    179400NodeID Node::sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid,
    180401        const LinkProperties& req) {
    181402   
    182     return base_overlay->sendMessageCloserToNodeID((Message*) msg, nid, sid);
    183 }
    184 
    185 
    186 seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk) {
    187         return base_overlay->sendMessage((Message*) msg, lnk);
    188 }
    189 
     403    reboost::message_t message = ((Message*) msg)->wrap_up_for_sending();
     404   
     405    return sendMessageCloserToNodeID(message, nid, sid, send_priority::NORMAL, req);
     406}
     407
     408
     409// +++ new interface +++
     410void Node::sendBroadcastMessage(reboost::message_t msg, const ServiceID& sid, uint8_t priority) {
     411
     412    // check priority
     413    check_send_priority(priority);
     414       
     415    // * call base overlay *
     416    return base_overlay->broadcastMessage(msg, sid, priority);
     417}
     418
     419// +++ legacy interface +++
    190420void Node::sendBroadcastMessage(const DataMessage& msg, const ServiceID& sid) {
    191         return base_overlay->broadcastMessage((Message*)msg, sid);
    192 }
     421    reboost::message_t message = ((Message*) msg)->wrap_up_for_sending();
     422
     423        return sendBroadcastMessage(message, sid);
     424}
     425
     426
     427/* +++++ [Message sending] +++++ */
     428
     429
     430
    193431
    194432bool Node::bind(NodeListener* listener) {
     
    204442        bool ret = base_overlay->bind(listener, sid);
    205443
    206         // now that we have a listener, we can ask if sniffing is ok
    207         if( ariba_mod.sideport_sniffer != NULL ){
    208                 base_overlay->registerSidePort(ariba_mod.sideport_sniffer);
    209         }
     444//      // now that we have a listener, we can ask if sniffing is ok
     445//      if( ariba_mod.sideport_sniffer != NULL ){
     446//              base_overlay->registerSidePort(ariba_mod.sideport_sniffer);
     447//      }
    210448
    211449        return ret;
  • source/ariba/Node.h

    r10653 r12060  
    4343namespace ariba {
    4444        class Node;
    45         namespace overlay {
    46                 class BaseOverlay;
    47         }
     45    namespace communication {
     46        class BaseCommunication;
     47    }
     48    namespace overlay {
     49        class BaseOverlay;
     50    }
    4851}
    4952
    5053#include <vector>
    5154#include <iostream>
    52 #include <boost/foreach.hpp>
     55#include <exception>
     56
    5357#include "Module.h"
    5458#include "Identifiers.h"
     
    5660#include "NodeListener.h"
    5761#include "Name.h"
    58 #include "AribaModule.h"
     62//#include "AribaModule.h"
    5963#include "CommunicationListener.h"
    6064#include "DataMessage.h"
    6165#include "SideportListener.h"
     66#include "ariba/overlay/SequenceNumber.h"
     67
     68// reboost messages
     69#include "ariba/utility/transport/messages/message.hpp"
     70
     71#include <boost/property_tree/ptree.hpp>
    6272
    6373using std::vector;
     
    6575
    6676namespace ariba {
     77
     78// typedef ariba::overlay::SequenceNumber SequenceNumber; // XXX see CommunicationListener
     79
     80using boost::property_tree::ptree;
     81
     82// sendMessage-Priorities
     83struct send_priority
     84{
     85    enum SEND_PRIORITY
     86    {
     87        HIGHEST = 2,
     88        HIGHER = 3,
     89        NORMAL = 4,
     90        LOWER = 5,
     91        LOWEST = 6
     92    };
     93};
     94
     95
    6796
    6897/**
     
    75104 * @author Christoph Mayer <mayer@tm.uka.de>
    76105 */
     106// TODO do we really want to inherit from Module.. ?
    77107class Node: public Module {
    78108public:
     109   
    79110        /**
    80111         * Constructs a new node using a given ariba module
     
    86117         *   is a zero-terminated char-string.
    87118         */
    88         Node(AribaModule& ariba_mod, const Name& node_name = Name::UNSPECIFIED);
     119//      Node(AribaModule& ariba_mod, const Name& node_name = Name::UNSPECIFIED);
     120       
     121        // XXX EXPERIMENTAL
     122        Node();
    89123
    90124        /**
     
    100134        //--- node control ---
    101135
     136    /**
     137     * XXX EXPERIMENTAL
     138     *
     139     * Replaces initialte & join
     140     */
     141    void connect(const ptree& config);
     142
     143    // XXX DEPRECATED
    102144        /**
    103145         * This method instructs the node to join a particular spovnet.
     
    107149         * @param vnetId The SpoVNet name
    108150         */
    109         void join(const Name& name);
     151//      void join(const Name& name);
    110152
    111153        /**
     
    116158         * @param param The SpoVNet properties
    117159         */
    118         void initiate(const Name& name, const SpoVNetProperties& parm =
    119                         SpoVNetProperties::DEFAULT);
     160//      void initiate(const Name& name, const SpoVNetProperties& parm =
     161//                      SpoVNetProperties::DEFAULT);
    120162
    121163        /**
     
    221263        void dropLink(const LinkID& lnk);
    222264
    223         // message sending
    224 
    225         /**
    226          * Sends a one-shot message to a service. If link properties are specified,
    227          * the node tries to fulfill those requirements. This may cause the node
    228          * to first establish a temporary link, second sending the message and last
    229          * dropping the link. This would result in a small amount of extra latency
    230          * until the message is delivered. If reliable transport was selected,
    231          * the method returns a sequence number and a communication event is
    232          * triggered on message delivery or loss.
    233          *
    234          * @param msg The message to be sent
    235          * @param nid The remote node identifier
    236          * @param sid The remote service identifier
    237          * @param req The requirements associated with the message
    238          * @return A sequence number
    239          */
     265        /**
     266         * Returns whether a link is direct or relayed over other nodes
     267         * @param lnk LinkID of the link
     268         * @return true if link is direct; false otherwise
     269         */
     270        bool isLinkDirect(const ariba::LinkID& lnk) const;
     271       
     272        /**
     273         * Returns the latest measured hop count on this link.
     274         * NOTE: This is not guaranteed to be up to date.
     275         *
     276         * @param lnk LinkID of the link
     277         * @return overlay hop count on this link
     278         */
     279        int getHopCount(const ariba::LinkID& lnk) const;
     280       
     281       
     282        /* +++++ Message sending +++++ */
     283
     284       
     285    /**
     286     * Sends a message via an established link. If reliable transport was
     287     * selected, the method returns a sequence number and a communication event
     288     * is triggered on message delivery or loss.
     289     *
     290     * +++ New interface, using efficient zero-copy reboost messages. +++
     291     *
     292     * @param msg The message to be sent
     293     * @param lnk The link to be used for sending the message
     294     */
     295    const SequenceNumber& sendMessage(reboost::message_t msg, const LinkID& lnk, uint8_t priority=send_priority::NORMAL);
     296
     297    /**
     298     * +++ Legacy interface, converts old ariba messages into new reboost messages. +++
     299     */   
     300    seqnum_t sendMessage(const DataMessage& msg, const LinkID& lnk);
     301
     302   
     303    /**
     304     * Sends a one-shot message to a service. If link properties are specified,
     305     * the node tries to fulfill those requirements. This may cause the node
     306     * to first establish a temporary link, second sending the message and last
     307     * dropping the link. This would result in a small amount of extra latency
     308     * until the message is delivered. If reliable transport was selected,
     309     * the method returns a sequence number and a communication event is
     310     * triggered on message delivery or loss.
     311     *
     312     * +++ New interface, using efficient zero-copy reboost messages. +++
     313     *
     314     * @param msg The message to be sent
     315     * @param nid The remote node identifier
     316     * @param sid The remote service identifier
     317     * @param req The requirements associated with the message
     318     * @return A sequence number
     319     */
     320    const SequenceNumber& sendMessage(reboost::message_t msg, const NodeID& nid, const ServiceID& sid,
     321            uint8_t priority=send_priority::NORMAL, const LinkProperties& req = LinkProperties::DEFAULT);
     322
     323    /**
     324     * +++ Legacy interface, converts old ariba messages into new reboost messages. +++
     325     */
    240326        seqnum_t sendMessage(const DataMessage& msg, const NodeID& nid, const ServiceID& sid,
    241327                        const LinkProperties& req = LinkProperties::DEFAULT);
    242328
    243         /**
    244          * like the above function, but sends the message to the closest directly known node
    245          * to the specified address
    246          */
     329   
     330    /**
     331     * like the above function, but sends the message to the closest directly known node
     332     * to the specified address
     333     *
     334     * +++ New interface, using efficient zero-copy reboost messages. +++
     335     *
     336     */
     337    NodeID sendMessageCloserToNodeID(reboost::message_t msg, const NodeID& nid, const ServiceID& sid,
     338            uint8_t priority=send_priority::NORMAL, const LinkProperties& req = LinkProperties::DEFAULT);
     339
     340    /**
     341     * +++ Legacy interface, converts old ariba messages into new reboost messages. +++
     342     */
    247343    NodeID sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid,
    248344            const LinkProperties& req = LinkProperties::DEFAULT);
    249345
    250         /**
    251          * Sends a message via an established link. If reliable transport was
    252          * selected, the method returns a sequence number and a communication event
    253          * is triggered on message delivery or loss.
    254          *
    255          * @param msg The message to be sent
    256          * @param lnk The link to be used for sending the message
    257          */
    258         seqnum_t sendMessage(const DataMessage& msg, const LinkID& lnk);
    259 
    260         /**
    261          * Sends a message to all known hosts in the overlay structure
    262          * the nodes that are reached here depend on the overlay structure.
    263          *
    264          * @param msg The message to be send
    265          * @param sid The id of the service that should receive the message
    266          * @see getNeighborNodes
    267          */
     346       
     347    /**
     348     * Sends a message to all known hosts in the overlay structure
     349     * the nodes that are reached here depend on the overlay structure.
     350     *
     351     * +++ New interface, using efficient zero-copy reboost messages. +++
     352     *
     353     * @param msg The message to be send
     354     * @param sid The id of the service that should receive the message
     355     * @see getNeighborNodes
     356     */
     357        void sendBroadcastMessage(reboost::message_t msg, const ServiceID& sid, uint8_t priority=send_priority::NORMAL);
     358
     359    /**
     360     * +++ Legacy interface, converts old ariba messages into new reboost messages. +++
     361     */
    268362        void sendBroadcastMessage(const DataMessage& msg, const ServiceID& sid);
    269363
     364       
     365        /* +++++ [Message sending] +++++ */
     366       
     367       
     368       
    270369        // --- communication listeners ---
    271370
     
    313412        /** @see Module.h */
    314413        string getName() const;
    315 
     414       
     415       
     416private:
     417        inline void check_send_priority(uint8_t priority);
     418
     419       
    316420protected:
    317421        // friends
     
    320424        // member variables
    321425        Name name;                             //< node name
    322         AribaModule& ariba_mod;                //< ariba module
     426//      AribaModule* ariba_mod;                //< ariba module
    323427        SpoVNetID spovnetId;                   //< current spovnet id
    324428        NodeID nodeId;                             //< current node id
     429        communication::BaseCommunication* base_communication;
    325430        overlay::BaseOverlay* base_overlay;    //< the base overlay
    326431
  • source/ariba/SideportListener.cpp

    r7468 r12060  
    4141#include "ariba/overlay/BaseOverlay.h"
    4242#include "ariba/overlay/LinkDescriptor.h"
    43 #include "ariba/utility/addressing/endpoint_set.hpp"
     43#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
    4444
    4545using ariba::overlay::LinkDescriptor;
     
    130130        if( overlay == NULL ) return (Protocol)ret;
    131131
    132         using namespace ariba::addressing;
    133 
    134132        LinkDescriptor* link = NULL;
    135133        BOOST_FOREACH( LinkDescriptor* lnk, overlay->links ){
     
    147145        if(bclink.isUnspecified() || bclink.remoteLocator == NULL) return (Protocol)ret;
    148146
    149         const address_v* locator = bclink.remoteLocator;
     147        addressing2::EndpointPtr locator = bclink.remoteLocator;
    150148
    151         if( locator->instanceof<tcpip_endpoint>() ){
    152                 tcpip_endpoint tcpip = *locator;
    153 
    154                 if( tcpip.address().is_v4() || tcpip.address().is_v4_mapped() ){
     149        if( locator->get_category() == addressing2::endpoint_category::TCPIP )
     150        {
     151        boost::shared_ptr<addressing2::tcpip_endpoint> endp =
     152                boost::dynamic_pointer_cast<addressing2::tcpip_endpoint>(locator);
     153           
     154                if( endp->to_asio().address().is_v4() )
     155                {
    155156                        ret = SideportListener::ipv4;
    156                 }else if( tcpip.address().is_v6() ){
     157                }else {
    157158                        ret = SideportListener::ipv6;
    158159                }
    159160
    160         }else if( locator->instanceof<rfcomm_endpoint>() ){
     161        }else if( locator->get_category() == addressing2::endpoint_category::BLUETOOTH ){
    161162                ret = SideportListener::rfcomm;
    162163        }
  • source/ariba/ariba.h

    r3374 r12060  
    4444 */
    4545
    46 #include "AribaModule.h"
    4746#include "CommunicationListener.h"
    4847#include "DataMessage.h"
  • source/ariba/communication/BaseCommunication.cpp

    r10767 r12060  
    5757namespace communication {
    5858
     59using namespace ariba::addressing2;
     60
    5961using ariba::utility::PeerID;
    6062using ariba::utility::SystemQueue;
    6163
    6264use_logging_cpp(BaseCommunication);
    63 
    64 /// adds an endpoint to the list
    65 void BaseCommunication::add_endpoint( const address_v* endpoint ) {
    66         if (endpoint==NULL) return;
    67         BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {
    68                 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {
    69                         ref.count++;
    70                         return;
    71                 }
    72         }
    73         endpoint_reference ref;
    74         ref.endpoint = endpoint->clone();
    75         ref.count = 1;
    76         remote_endpoints.push_back(ref);
    77 }
    78 
    79 /// removes an endpoint from the list
    80 void BaseCommunication::remove_endpoint( const address_v* endpoint ) {
    81         if (endpoint==NULL) return;
    82         for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();
    83                 i != remote_endpoints.end(); i++) {
    84                 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {
    85                         i->count--;
    86                         if (i->count==0) {
    87                                 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");
    88                                 transport->terminate(i->endpoint);
    89                                 delete i->endpoint;
    90                                 remote_endpoints.erase(i);
    91                         }
    92                         return;
    93                 }
    94         }
    95 }
    9665
    9766
     
    10069          transport( NULL ),
    10170          messageReceiver( NULL ),
    102           started( false )
     71          started( false ),
     72          listenOn_endpoints(new addressing2::endpoint_set())
    10373{
    10474}
     
    10979
    11080
    111 void BaseCommunication::start() {
     81void BaseCommunication::start(EndpointSetPtr listen_on) {
     82    assert ( ! started );
     83   
     84    listenOn_endpoints = listen_on;
     85    logging_info("Setting local end-points: " << listenOn_endpoints->to_string());
     86   
    11287        logging_info( "Starting up ..." );
    11388        currentSeqnum = 0;
    11489
    115         // set local peer id
    116         localDescriptor.getPeerId() = PeerID::random();
    117         logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
    118 
    11990        // creating transports
     91        //  ---> transport_peer holds the set of the active endpoints we're listening on
    12092        logging_info( "Creating transports ..." );
    121 
    122 #ifdef UNDERLAY_OMNET
    123         AribaOmnetModule* module = StartupWrapper::getCurrentModule();
    124         module->setServerPort( listenport );
    125 
    126         transport = module;
    127         network = new OmnetNetworkProtocol( module );
    128 #else
    129         transport = new transport_peer( localDescriptor.getEndpoints() );
    130 #endif
     93        transport = new transport_peer();
     94        active_listenOn_endpoints = transport->add_listenOn_endpoints(listenOn_endpoints);
     95        logging_info( "XXX. Active endpoints = " << active_listenOn_endpoints->to_string() ); // XXX
    13196
    13297        logging_info( "Searching for local locators ..." );
    133         /**
    134          * DONT DO THAT: if(localDescriptor.getEndpoints().to_string().length() == 0)
    135          * since addresses are used to initialize transport addresses
    136          */
    137         AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() );
    138         logging_info( "Done. Local endpoints = " << localDescriptor.toString() );
    139 
     98        local_endpoints = AddressDiscovery::discover_endpoints(active_listenOn_endpoints);
     99        if ( local_endpoints->count() > 0 )
     100        {
     101            logging_info( "Done. Discovered local endpoints: " << local_endpoints->to_string() );
     102        }
     103        else
     104        {
     105            logging_warn("WARING!! No local endpoints found, NO COMMUNICATION POSSIBLE!!");
     106           
     107            // TODO notify application, so that it may react properly. throw exception..?
     108            assert( false );
     109        }
     110
     111       
     112    // create local EndpointDescriptor
     113        //  ---> localDescriptor hold the set endpoints that can be used to reach us
     114    localDescriptor.getPeerId() = PeerID::random();
     115    localDescriptor.replace_endpoint_set(local_endpoints);
     116    logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
     117
     118    // start transport_peer
    140119        transport->register_listener( this );
    141120        transport->start();
    142121
    143 #ifndef UNDERLAY_OMNET
    144122        // bind to the network change detection
    145123        networkMonitor.registerNotification( this );
    146 #endif
    147124
    148125        // base comm startup done
     
    163140bool BaseCommunication::isStarted(){
    164141        return started;
    165 }
    166 
    167 /// Sets the endpoints
    168 void BaseCommunication::setEndpoints( string& _endpoints ) {
    169         localDescriptor.getEndpoints().assign(_endpoints);
    170         logging_info("Setting local end-points: "
    171                 << localDescriptor.getEndpoints().to_string());
    172142}
    173143
     
    193163        addLink( ld );
    194164
    195         // send a message to request new link to remote
    196         logging_debug( "Send messages with request to open link to " << descriptor.toString() );
    197         AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid );
    198         baseMsg.getLocalDescriptor() = localDescriptor;
    199         baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId();
    200 
    201         // serialize and send message
    202         send( &baseMsg, descriptor );
     165
     166    /* send a message to request new link to remote */
     167    logging_debug( "Send messages with request to open link to " << descriptor.toString() );
     168
     169    /*
     170         * Create Link-Request Message:
     171         * NOTE: - Their PeerID (in parent message)
     172         * - Our LinkID
     173         * - Our PeerID
     174         * - Our EndpointDescriptor
     175         */
     176        reboost::message_t linkmsg;
     177        linkmsg.push_back(linkid.serialize());
     178        linkmsg.push_back(localDescriptor.getPeerId().serialize());
     179        linkmsg.push_back(localDescriptor.endpoints->serialize());
     180       
     181//      // XXX AKTUELL BUG FINDING...
     182//      reboost::shared_buffer_t xxx = localDescriptor.endpoints->serialize();
     183//      EndpointSetPtr xxx_set = endpoint_set::create_EndpointSet();
     184//      xxx_set->deserialize(xxx);
     185//      cout << "/// MARIO VORHER: " << localDescriptor.endpoints->to_string() << endl;
     186//      cout << "/// MARIO NACHHER: " << xxx_set->to_string() << endl;
     187       
     188        // send message
     189        // TODO move enum to BaseComm
     190        send_to_peer(AribaBaseMsg::typeLinkRequest, descriptor.getPeerId(), linkmsg,
     191                descriptor, system_priority::OVERLAY);
    203192
    204193        return linkid;
     
    217206
    218207        // tell the registered listeners
    219         BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
     208        foreach( CommunicationEvents* i, eventListener ) {
    220209                i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
    221210        }
    222211
    223         // create message to drop the link
     212
     213        // * send message to drop the link *
    224214        logging_debug( "Sending out link close request. for us, the link is closed now" );
    225         AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink );
    226 
    227         // send message to drop the link
    228         send( &msg, ld );
     215        reboost::message_t empty_message;
     216        send_over_link( AribaBaseMsg::typeLinkClose, empty_message, ld, system_priority::OVERLAY );
    229217
    230218        // remove from map
     
    232220}
    233221
    234 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
    235 
    236         logging_debug( "Sending out message to link " << lid.toString() );
    237 
     222
     223seqnum_t BaseCommunication::sendMessage( const LinkID& lid,
     224        reboost::message_t message,
     225        uint8_t priority,
     226        bool bypass_overlay) throw(communication_message_not_sent)
     227{
     228    // message type: direct data or (normal) data
     229    AribaBaseMsg::type_ type;
     230    if ( bypass_overlay )
     231    {
     232        type = AribaBaseMsg::typeDirectData;
     233        logging_debug( "Sending out direct-message to link " << lid.toString() );
     234    }
     235    else
     236    {
     237        type = AribaBaseMsg::typeData;
     238        logging_debug( "Sending out message to link " << lid.toString() );
     239    }
     240
     241   
    238242        // query local link info
    239243        LinkDescriptor& ld = queryLocalLink(lid);
    240         if( ld.isUnspecified() ){
    241                 logging_error( "Don't know the link with id " << lid.toString() );
    242                 return -1;
     244        if( ld.isUnspecified() )
     245        {
     246            throw communication_message_not_sent("Don't know the link with id "
     247                    + lid.toString());
    243248        }
    244249
    245250        // link not up-> error
    246         if( !ld.up ) {
    247                 logging_error("Can not send on link " << lid.toString() << ": link not up");
    248                 return -1;
    249         }
    250 
    251         // create message
    252         AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink );
    253 
    254         // encapsulate the payload message
    255         msg.encapsulate( const_cast<Message*>(message) );
    256 
    257         // send message
    258         send( &msg, ld );
    259 
    260         // return sequence number
     251        if( !ld.up )
     252        {
     253            throw communication_message_not_sent("Can not send on link "
     254                    + lid.toString() + ": link not up");
     255        }
     256
     257
     258        // * send message *
     259        bool okay = send_over_link( type, message, ld, priority );
     260
     261        if ( ! okay )
     262        {
     263            throw communication_message_not_sent("send_over_link failed!");
     264        }
     265       
    261266        return ++currentSeqnum;
    262267}
     
    268273                LinkDescriptor& linkDesc = queryLocalLink(link);
    269274                if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
    270                 return linkDesc.remoteEndpoint;
     275                return linkDesc.remoteDescriptor;
    271276        }
    272277}
     
    283288}
    284289
    285 SystemEventType TransportEvent("Transport");
    286 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
    287 
    288 /// called when a system event is emitted by system queue
    289 void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
    290 
    291         // dispatch received messages
    292         if ( event.getType() == MessageDispatchEvent ){
    293                 logging_debug( "Forwarding message receiver" );
    294                 boost::function0<void>* handler = event.getData< boost::function0<void> >();
    295                 (*handler)();
    296                 delete handler;
    297         }
    298 }
    299 
    300 /**
    301  * called within the ASIO thread
    302  * when a message is received from underlay transport
    303  */
     290
     291
     292/*------------------------------
     293 | ASIO thread --> SystemQueue |
     294 ------------------------------*/
     295
     296/// ASIO thread
    304297void BaseCommunication::receive_message(transport_connection::sptr connection,
    305         reboost::message_t msg) {
     298        reboost::shared_buffer_t msg) {
    306299
    307300        logging_debug( "Dispatching message" );
    308301       
    309     boost::function0<void>* handler = new boost::function0<void>(
     302    SystemQueue::instance().scheduleCall(
    310303            boost::bind(
    311304                    &BaseCommunication::receiveMessage,
     
    313306                    connection,
    314307                    msg)
    315     );
    316    
    317     SystemQueue::instance().scheduleEvent(
    318         SystemEvent(this, MessageDispatchEvent, handler)
    319     );
    320 }
    321 
    322 /**
    323  * called within the ARIBA thread (System Queue)
    324  * when a message is received from underlay transport
    325  */
     308        );
     309}
     310
     311/// ASIO thread
     312void BaseCommunication::connection_terminated(transport_connection::sptr connection)
     313{
     314    SystemQueue::instance().scheduleCall(
     315            boost::bind(
     316                    &BaseCommunication::connectionTerminated,
     317                    this,
     318                    connection)
     319        );
     320}
     321
     322/*--------------------------------
     323 | [ASIO thread --> SystemQueue] |
     324 -------------------------------*/
     325
     326/// ARIBA thread (System Queue)
     327void BaseCommunication::connectionTerminated(transport_connection::sptr connection)
     328{
     329    vector<LinkID*> links = connection->get_communication_links();
     330   
     331    logging_debug("[BaseCommunication] Connection terminated: "
     332            << connection->getLocalEndpoint()->to_string()
     333            << " <--> " << connection->getRemoteEndpoint()->to_string()
     334            << " (" << links.size() << " links)");
     335   
     336    // remove all links that used the terminated connection
     337    for ( vector<LinkID*>::iterator it = links.begin(); it != links.end(); ++it )
     338    {
     339        LinkID& link_id = **it;
     340       
     341        logging_debug("  ---> Removing link: " << link_id.toString());
     342       
     343        // searching for link, not found-> warn
     344        LinkDescriptor& linkDesc = queryLocalLink( link_id );
     345        if (linkDesc.isUnspecified()) {
     346            logging_warn("Failed to find local link " << link_id.toString());
     347            continue;
     348        }
     349
     350        // inform listeners
     351        foreach( CommunicationEvents* i, eventListener ){
     352            i->onLinkFail( linkDesc.localLink,
     353                    linkDesc.localLocator, linkDesc.remoteLocator );
     354        }
     355
     356        // remove the link descriptor
     357        removeLink( link_id );
     358    }
     359}
     360
     361/// ARIBA thread (System Queue)
    326362void BaseCommunication::receiveMessage(transport_connection::sptr connection,
    327         reboost::message_t message)
     363        reboost::shared_buffer_t message)
    328364{
    329    
    330     //// Adapt to old message system ////
    331     // Copy data
    332     size_t bytes_len = message.size();
    333     uint8_t* bytes = new uint8_t[bytes_len];
    334     message.read(bytes, 0, bytes_len);
    335    
    336     Data data(bytes, bytes_len * 8);
    337    
    338     Message legacy_message;
    339     legacy_message.setPayload(data);
    340    
    341    
    342    
    343         /// decapsulate message
    344         AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>();
    345         logging_debug( "Receiving message of type " << msg->getTypeString() );
    346 
     365    // XXX
     366    logging_debug("/// [receiveMessage] buffersize: " << message.size());
     367       
     368    // get type
     369    uint8_t type = message.data()[0];
     370    reboost::shared_buffer_t sub_buff = message(1);
     371   
     372    // get link id
     373    LinkID link_id;
     374    if ( type != AribaBaseMsg::typeLinkRequest)
     375    {
     376        sub_buff = link_id.deserialize(sub_buff);
     377    }
     378   
    347379        // handle message
    348         switch (msg->getType()) {
    349 
     380        switch ( type )
     381        {
    350382                // ---------------------------------------------------------------------
    351383                // data message
    352384                // ---------------------------------------------------------------------
    353                 case AribaBaseMsg::typeData: {
    354                         logging_debug( "Received data message, forwarding to overlay" );
    355                         if( messageReceiver != NULL ) {
     385                case AribaBaseMsg::typeData:
     386                {
     387                        logging_debug( "Received data message, forwarding to overlay." );
     388                        if( messageReceiver != NULL )
     389                        {
    356390                                messageReceiver->receiveMessage(
    357                                         msg, msg->getRemoteLink(), NodeID::UNSPECIFIED
     391                                        sub_buff, link_id, NodeID::UNSPECIFIED, false
    358392                                );
    359393                        }
     394                       
    360395                        break;
    361396                }
    362397
     398        // ---------------------------------------------------------------------
     399        // direct data message (bypass overlay-layer)
     400        // ---------------------------------------------------------------------
     401        case AribaBaseMsg::typeDirectData:
     402        {
     403            logging_debug( "Received direct data message, forwarding to application." );
     404           
     405            if( messageReceiver != NULL )
     406            {
     407                messageReceiver->receiveMessage(
     408                    sub_buff, link_id, NodeID::UNSPECIFIED, true
     409                );
     410            }
     411           
     412            break;
     413        }
     414
     415
     416       
    363417                // ---------------------------------------------------------------------
    364418                // handle link request from remote
    365419                // ---------------------------------------------------------------------
    366                 case AribaBaseMsg::typeLinkRequest: {
    367                         logging_debug( "Received link open request" );
    368 
    369                         /// not the correct peer id-> skip request
    370                         if (!msg->getRemoteDescriptor().getPeerId().isUnspecified()
    371                                 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) {
    372                                 logging_info("Received link request for "
    373                                         << msg->getRemoteDescriptor().getPeerId().toString()
    374                                         << "but i'm "
    375                                         << localDescriptor.getPeerId()
    376                                         << ": Ignoring!");
    377                                 break;
    378                         }
    379 
     420                case AribaBaseMsg::typeLinkRequest:
     421                {
     422                        logging_debug( "Received link open request on "
     423                                << connection->getLocalEndpoint()->to_string() );
     424                       
     425                        /*
     426                         * Deserialize Peer Message
     427                         * - Our PeerID
     428                         */
     429                        PeerID our_peer_id;
     430                        sub_buff = our_peer_id.deserialize(sub_buff);
     431
     432            /// not the correct peer id-> skip request
     433            if ( our_peer_id != localDescriptor.getPeerId() &&
     434                    ! our_peer_id.isUnspecified() /* overlay bootstrap */ )
     435            {
     436                logging_info("Received link request for "
     437                    << our_peer_id.toString()
     438                    << "but i'm "
     439                    << localDescriptor.getPeerId()
     440                    << ": Ignoring!");
     441               
     442                // TODO terminate connection?
     443               
     444                break;
     445            }
     446
     447           
     448                    /*
     449                     * Deserialize Link-Request Message:
     450                     * - Their LinkID
     451                     * - Their PeerID
     452                     * - Their EndpointDescriptor
     453                     */
     454            LinkID their_link_id;
     455            PeerID their_peer_id;
     456            EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet();
     457            sub_buff = their_link_id.deserialize(sub_buff);
     458            sub_buff = their_peer_id.deserialize(sub_buff);
     459            sub_buff = their_endpoints->deserialize(sub_buff);
     460            /* [ Deserialize Link-Request Message ] */
     461
     462           
    380463                        /// only answer the first request
    381                         if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) {
     464                        if (!queryRemoteLink(their_link_id).isUnspecified())
     465                        {
     466                           
     467                            // TODO aktuell: When will these connections be closed?
     468                            // ---> Close it now (if it services no links) ?
     469                            //   (see also ! allowlink below)
     470                           
     471                            // XXX AKTUELL TESTING !! This will cause race conditions. So this is test-code only!
     472                            if ( connection->get_communication_links().size() == 0 )
     473                            {
     474                                connection->terminate();
     475                            }
     476                           
    382477                                logging_debug("Link request already received. Ignore!");
    383478                                break;
     
    386481                        /// create link ids
    387482                        LinkID localLink  = LinkID::create();
    388                         LinkID remoteLink = msg->getLocalLink();
     483                        LinkID remoteLink = their_link_id;  // XXX intermediate variable is unnecessary
    389484                        logging_debug(
    390485                                "local=" << connection->getLocalEndpoint()->to_string()
     
    394489                        // check if link creation is allowed by ALL listeners
    395490                        bool allowlink = true;
    396                         BOOST_FOREACH( CommunicationEvents* i, eventListener ){
     491                        foreach( CommunicationEvents* i, eventListener ){
    397492                                allowlink &= i->onLinkRequest( localLink,
    398493                                        connection->getLocalEndpoint(),
     
    403498                        if( !allowlink ){
    404499                                logging_warn( "Overlay denied creation of link" );
    405                                 delete msg;
    406500                                return;
    407501                        }
     
    411505                        ld->localLink = localLink;
    412506                        ld->remoteLink = remoteLink;
    413                         ld->localLocator = connection->getLocalEndpoint()->clone();
    414                         ld->remoteLocator = connection->getRemoteEndpoint()->clone();
    415                         ld->connection = connection;
    416                         ld->remoteEndpoint = msg->getLocalDescriptor();
    417                         add_endpoint(ld->remoteLocator);
    418 
    419                         // add layer 1-3 addresses
    420                         ld->remoteEndpoint.getEndpoints().add(
    421                                 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
    422                         localDescriptor.getEndpoints().add(
    423                                 connection->getLocalEndpoint(),
    424                                 endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
     507                        ld->localLocator = connection->getLocalEndpoint();
     508                        ld->remoteLocator = connection->getRemoteEndpoint();
     509                        ld->remoteDescriptor = EndpointDescriptor(their_peer_id, their_endpoints);
     510                        ld->set_connection(connection);
     511
     512           
     513                        // update endpoints (should only have any effect in case of NAT)
     514                        ld->remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint());
     515//                      localDescriptor.endpoints->add_endpoint(connection->getLocalEndpoint());  // XXX 0.0.0.0:0
    425516
    426517                        // link is now up-> add it
     
    428519                        addLink(ld);
    429520
    430                         // link is up!
    431                         logging_debug( "Link (initiated from remote) is up with "
    432                                 << "local(id=" << ld->localLink.toString() << ","
    433                                 << "locator=" << ld->localLocator->to_string() << ") "
    434                                 << "remote(id=" << ld->remoteLink.toString() << ", "
    435                                 << "locator=" << ld->remoteLocator->to_string() << ")"
    436                         );
    437 
    438                         // sending link request reply
    439                         logging_debug( "Sending link request reply with ids "
    440                                 << "local=" << localLink.toString() << ", "
    441                                 << "remote=" << remoteLink.toString() );
    442                         AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink );
    443                         reply.getLocalDescriptor() = localDescriptor;
    444                         reply.getRemoteDescriptor() = ld->remoteEndpoint;
    445 
    446                         send( &reply, *ld );
     521
     522                       
     523            /* sending link reply */
     524            logging_debug( "Sending link reply with ids "
     525                << "local=" << localLink.toString() << ", "
     526                << "remote=" << remoteLink.toString() );
     527
     528                    /*
     529                     * Create Link-Reply Message:
     530                     * - Our LinkID
     531                     * - Our Endpoint_Set (as update)
     532                     * - Their EndpointDescriptor (maybe they learn something about NAT)
     533                     */
     534                    reboost::message_t linkmsg;
     535                    linkmsg.push_back(localLink.serialize());
     536                    linkmsg.push_back(localDescriptor.endpoints->serialize());
     537                    linkmsg.push_back(ld->remoteDescriptor.endpoints->serialize());
     538                   
     539                    // XXX
     540                    cout << "/// MARIO: " << ld->get_connection()->getRemoteEndpoint()->to_string() << endl;
     541                   
     542                    // send message
     543                        bool sent = send_over_link( AribaBaseMsg::typeLinkReply, linkmsg, *ld, system_priority::OVERLAY );
     544                       
     545                        if ( ! sent )
     546                        {
     547                            logging_error("ERROR: Could not send LinkReply to: " << ld->remoteLocator->to_string());
     548                           
     549                            // TODO remove link, close link, ..?
     550                           
     551                            break;
     552                        }
     553
     554
     555            // link is up!
     556            logging_debug( "Link (initiated from remote) is up with "
     557                << "local(id=" << ld->localLink.toString() << ","
     558                << "locator=" << ld->localLocator->to_string() << ") "
     559                << "remote(id=" << ld->remoteLink.toString() << ", "
     560                << "locator=" << ld->remoteLocator->to_string() << ")"
     561            );
    447562
    448563                        // inform listeners about new open link
    449                         BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
     564                        foreach( CommunicationEvents* i, eventListener ) {
    450565                                i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
    451566                        }
     
    458573                // handle link request reply
    459574                // ---------------------------------------------------------------------
    460                 case AribaBaseMsg::typeLinkReply: {
     575                case AribaBaseMsg::typeLinkReply:
     576                {
    461577                        logging_debug( "Received link open reply for a link we initiated" );
    462578
     579            /*
     580             * Deserialize Link-Reply Message:
     581             * - Their LinkID
     582             * - Their Endpoint_Set (as update)
     583             * - Our EndpointDescriptor (maybe we can learn something about NAT)
     584             */
     585                        LinkID their_link_id;
     586                        EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet();
     587                        EndpointSetPtr our_endpoints = endpoint_set::create_EndpointSet();
     588                        sub_buff = their_link_id.deserialize(sub_buff);
     589                        sub_buff = their_endpoints->deserialize(sub_buff);
     590                        sub_buff = our_endpoints->deserialize(sub_buff);
     591
     592           
    463593                        // this is a reply to a link open request, so we have already
    464594                        // a link mapping and can now set the remote link to valid
    465                         LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink() );
     595                        LinkDescriptor& ld = queryLocalLink( link_id );
    466596
    467597                        // no link found-> warn!
    468598                        if (ld.isUnspecified()) {
    469                                 logging_warn("Failed to find local link " << msg->getRemoteLink().toString());
    470                                 delete msg;
     599                                logging_warn("Failed to find local link " << link_id.toString());
    471600                                return;
    472601                        }
     602                       
     603                        if ( ld.up )
     604                        {
     605                logging_warn("Got link replay for already open link. Ignore. LinkID: " << link_id.toString());
     606               
     607                // TODO send LinkClose ?
     608                return;                     
     609                        }
    473610
    474611                        // store the connection
    475                         ld.connection = connection;
     612                        ld.set_connection(connection);
    476613                       
    477614                        // set remote locator and link id
    478                         ld.remoteLink = msg->getLocalLink();
    479                         ld.remoteLocator = connection->getRemoteEndpoint()->clone();
    480                         ld.remoteEndpoint.getEndpoints().add(
    481                                                         msg->getLocalDescriptor().getEndpoints(),
    482                                                         endpoint_set::Layer1_4
    483                                                 );
    484 
    485                         localDescriptor.getEndpoints().add(
    486                                 msg->getRemoteDescriptor().getEndpoints(),
    487                                 endpoint_set::Layer1_3
    488                         );
     615                        ld.remoteLink = their_link_id;
     616                        ld.remoteLocator = connection->getRemoteEndpoint();
     617                       
     618                       
     619                        /* Update endpoints */
     620                        // NOTE: we might loose some information here, but it's our only chance to get rid of outdated information.
     621                        ld.remoteDescriptor.replace_endpoint_set(their_endpoints);
     622                       
     623                        // add actual remote endpoint to this set (should only have any effect in case of NAT)
     624                        ld.remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint());
     625                       
     626                        // TODO In case of NAT, we could learn something about our external IP.
     627                        //   ---> But we must trust the remote peer about this information!! 
     628//                      localDescriptor.endpoints->add_endpoints(our_endpoints);
     629                       
     630                       
     631                       
     632                       
    489633                        ld.up = true;
    490                         add_endpoint(ld.remoteLocator);
    491634
    492635                        logging_debug( "Link is now up with local id "
     
    496639
    497640                        // inform lisneters about link up event
    498                         BOOST_FOREACH( CommunicationEvents* i, eventListener ){
     641                        foreach( CommunicationEvents* i, eventListener ){
    499642                                i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
    500643                        }
     
    509652                case AribaBaseMsg::typeLinkClose: {
    510653                        // get remote link
    511                         const LinkID& localLink = msg->getRemoteLink();
    512                         logging_debug( "Received link close request for link " << localLink.toString() );
     654//                      const LinkID& localLink = msg.getRemoteLink();
     655                        logging_debug( "Received link close request for link " << link_id.toString() );
    513656
    514657                        // searching for link, not found-> warn
    515                         LinkDescriptor& linkDesc = queryLocalLink( localLink );
     658                        LinkDescriptor& linkDesc = queryLocalLink( link_id );
    516659                        if (linkDesc.isUnspecified()) {
    517                                 logging_warn("Failed to find local link " << localLink.toString());
    518                                 delete msg;
     660                                logging_warn("Failed to find local link " << link_id.toString());
    519661                                return;
    520662                        }
    521663
    522664                        // inform listeners
    523                         BOOST_FOREACH( CommunicationEvents* i, eventListener ){
     665                        foreach( CommunicationEvents* i, eventListener ){
    524666                                i->onLinkDown( linkDesc.localLink,
    525667                                                linkDesc.localLocator, linkDesc.remoteLocator );
     
    527669
    528670                        // remove the link descriptor
    529                         removeLink( localLink );
     671                        removeLink( link_id );
    530672
    531673                        // done
     
    534676
    535677                // ---------------------------------------------------------------------
    536                 // handle link locator changes
     678                // handle link locator changes  -- TODO is this ever called..?
    537679                // ---------------------------------------------------------------------
    538                 case AribaBaseMsg::typeLinkUpdate: {
    539                         const LinkID& localLink = msg->getRemoteLink();
    540                         logging_debug( "Received link update for link "
    541                                 << localLink.toString() );
    542 
    543                         // find the link description
    544                         LinkDescriptor& linkDesc = queryLocalLink( localLink );
    545                         if (linkDesc.isUnspecified()) {
    546                                 logging_warn("Failed to update local link "
    547                                         << localLink.toString());
    548                                 delete msg;
    549                                 return;
    550                         }
    551 
    552                         // update the remote locator
    553                         const address_v* oldremote = linkDesc.remoteLocator;
    554                         linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone();
    555 
    556                         // inform the listeners (local link has _not_ changed!)
    557                         BOOST_FOREACH( CommunicationEvents* i, eventListener ){
    558                                 i->onLinkChanged(
    559                                         linkDesc.localLink,     // linkid
    560                                         linkDesc.localLocator,  // old local
    561                                         linkDesc.localLocator,  // new local
    562                                         oldremote,              // old remote
    563                                         linkDesc.remoteLocator  // new remote
    564                                 );
    565                         }
    566 
    567                         // done
    568                         break;
    569                 }
    570         }
    571 
    572         delete msg;
     680//              case AribaBaseMsg::typeLinkUpdate: {
     681//                      const LinkID& localLink = msg.getRemoteLink();
     682//                      logging_debug( "Received link update for link "
     683//                              << localLink.toString() );
     684//
     685//                      // find the link description
     686//                      LinkDescriptor& linkDesc = queryLocalLink( localLink );
     687//                      if (linkDesc.isUnspecified()) {
     688//                              logging_warn("Failed to update local link "
     689//                                      << localLink.toString());
     690//                              return;
     691//                      }
     692//
     693//                      // update the remote locator
     694//                      addressing2::EndpointPtr oldremote = linkDesc.remoteLocator;
     695//                      linkDesc.remoteLocator = connection->getRemoteEndpoint();
     696//                     
     697//                      // TODO update linkDesc.connection ?
     698//
     699//                      // inform the listeners (local link has _not_ changed!)
     700//                      foreach( CommunicationEvents* i, eventListener ){
     701//                              i->onLinkChanged(
     702//                                      linkDesc.localLink,     // linkid
     703//                                      linkDesc.localLocator,  // old local
     704//                                      linkDesc.localLocator,  // new local
     705//                                      oldremote,                      // old remote
     706//                                      linkDesc.remoteLocator  // new remote
     707//                              );
     708//                      }
     709//
     710//                      // done
     711//                      break;
     712//              }
     713               
     714               
     715        default: {
     716            logging_warn( "Received unknown message type!" );
     717            break;
     718        }
     719
     720        }
    573721}
    574722
     
    582730        for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
    583731                if( (*i)->localLink != localLink) continue;
    584                 remove_endpoint((*i)->remoteLocator);
     732//              remove_endpoint((*i)->remoteLocator);  // XXX
    585733                delete *i;
    586734                linkSet.erase( i );
     
    605753}
    606754
    607 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
    608         LinkIDs ids;
    609         for (size_t i=0; i<linkSet.size(); i++){
    610                 if( addr == NULL ){
    611                         ids.push_back( linkSet[i]->localLink );
    612                 } else {
    613                         if ( *linkSet[i]->remoteLocator == *addr )
    614                                 ids.push_back( linkSet[i]->localLink );
    615                 }
    616         }
    617         return ids;
    618 }
     755//LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
     756//      LinkIDs ids;
     757//      for (size_t i=0; i<linkSet.size(); i++){
     758//              if( addr == NULL ){
     759//                      ids.push_back( linkSet[i]->localLink );
     760//              } else {
     761//                      if ( *linkSet[i]->remoteLocator == *addr )
     762//                              ids.push_back( linkSet[i]->localLink );
     763//              }
     764//      }
     765//      return ids;
     766//}
    619767
    620768void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
    621 
    622 #ifdef UNDERLAY_OMNET
    623 
    624         // we have no mobility support for simulations
    625         return
    626 
    627 #endif // UNDERLAY_OMNET
    628769
    629770/*- disabled!
     
    762903}
    763904
    764 /// sends a message to all end-points in the end-point descriptor
    765 void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) {
    766         Data data = data_serialize(legacy_message, DEFAULT_V);
    767        
    768         //// Adapt to new message system ////
    769         // transfer data buffer ownership to the shared_buffer
    770     reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
    771        
    772         reboost::message_t message;
    773         message.push_back(buf);
    774        
    775         transport->send(endpoint.getEndpoints(), message);
    776 }
    777 
    778 /// sends a message to the remote locator inside the link descriptor
    779 void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) {
    780         if (desc.remoteLocator==NULL) return;
    781        
    782         Data data = data_serialize(legacy_message, DEFAULT_V);
    783    
    784     //// Adapt to new message system ////
    785     // transfer data buffer ownership to the shared_buffer
    786     reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
    787    
    788     reboost::message_t message;
    789     message.push_back(buf);
    790    
    791         desc.connection->send(message);
    792 }
     905
     906addressing2::EndpointPtr BaseCommunication::get_local_endpoint_of_link(
     907        const LinkID& linkid)
     908{
     909    LinkDescriptor& ld = queryLocalLink(linkid);
     910   
     911    return ld.get_connection()->getLocalEndpoint();
     912}
     913
     914addressing2::EndpointPtr BaseCommunication::get_remote_endpoint_of_link(
     915        const LinkID& linkid)
     916{
     917    LinkDescriptor& ld = queryLocalLink(linkid);
     918   
     919    return ld.get_connection()->getRemoteEndpoint();
     920}
     921
     922
     923
     924bool BaseCommunication::send_over_link(
     925        const uint8_t type,
     926        reboost::message_t message,
     927        const LinkDescriptor& desc,
     928        const uint8_t priority)
     929{
     930    /*
     931     * Create Link Message:
     932     * - Type
     933     * - Their LinkID
     934     */
     935    // link id
     936    message.push_front(desc.remoteLink.serialize());
     937    // type
     938    memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t));
     939    /* [ Create Link Message ] */
     940   
     941   
     942    /* send message */
     943    transport_connection::sptr conn = desc.get_connection();
     944    if ( ! conn )
     945    {
     946        cout << "/// MARIO: No connection!!" << endl;  // XXX debug
     947        return false;
     948    }
     949   
     950    // * send over connection *
     951    return conn->send(message, priority);
     952}
     953
     954void BaseCommunication::send_to_peer(
     955        const uint8_t type,
     956        const PeerID& peer_id,
     957        reboost::message_t message,
     958        const EndpointDescriptor& endpoint,
     959        const uint8_t priority )
     960{
     961    /*
     962     * Create Peer Message:
     963     * - Type
     964     * - Their PeerID
     965     */
     966    // peer id
     967    message.push_front(peer_id.serialize());
     968    // type
     969    memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t));
     970   
     971   
     972    /* send message */
     973    transport->send(endpoint.getEndpoints(), message, priority);
     974}
     975
     976
     977
    793978
    794979}} // namespace ariba, communication
  • source/ariba/communication/BaseCommunication.h

    r10653 r12060  
    5050#include <boost/foreach.hpp>
    5151
     52#ifdef ECLIPSE_PARSER
     53    #define foreach(a, b) for(a : b)
     54#else
     55    #define foreach(a, b) BOOST_FOREACH(a, b)
     56#endif
     57
    5258// utilities
    5359#include "ariba/utility/types.h"
    54 #include "ariba/utility/messages.h"
     60#include "ariba/utility/messages/MessageReceiver.h"
    5561#include "ariba/utility/logging/Logging.h"
    5662#include "ariba/utility/misc/Demultiplexer.hpp"
     
    5864
    5965// new transport and addressing
    60 #include "ariba/utility/addressing/addressing.hpp"
    61 #include "ariba/utility/transport/transport.hpp"
    62 #include "ariba/utility/transport/transport_connection.hpp"
     66#include "ariba/utility/transport/transport_peer.hpp"
     67#include "ariba/utility/transport/interfaces/transport_connection.hpp"
     68#include "ariba/utility/transport/interfaces/transport_listener.hpp"
     69#include "ariba/utility/addressing2/endpoint.hpp"
    6370
    6471// communication
     
    7279#include "ariba/communication/networkinfo/NetworkInformation.h"
    7380
    74 // disabled
    75 //#ifndef UNDERLAY_OMNET
    76 //  #include "ariba/communication/modules/transport/tcp/TCPTransport.h"
    77 //  #include "ariba/communication/modules/network/ip/IPv4NetworkProtocol.h"
    78 //  using ariba::communication::IPv4NetworkProtocol;
    79 //  using ariba::communication::TCPTransport;
    80 //#endif
    81 
    8281namespace ariba {
    83   class SideportListener;
     82    class SideportListener;
    8483}
    8584
     
    8786namespace communication {
    8887
     88
     89class communication_message_not_sent: public std::runtime_error
     90{
     91public:
     92    /** Takes a character string describing the error.  */
     93    explicit communication_message_not_sent(const string& __arg)  :
     94        std::runtime_error(__arg)
     95    {
     96    }
     97   
     98    virtual ~communication_message_not_sent() throw() {}
     99};
     100
     101
     102
    89103using namespace std;
    90 using namespace ariba::addressing;
    91104using namespace ariba::transport;
    92105using namespace ariba::utility;
     
    102115 * protocols and addressing schemes.
    103116 *
    104  * @author Sebastian Mies, Christoph Mayer
     117 * @author Sebastian Mies, Christoph Mayer, Mario Hock
    105118 */
    106119class BaseCommunication:
    107120        public NetworkChangeInterface,
    108         public SystemEventListener,
    109121        public transport_listener {
    110122
     
    120132
    121133        /// Startup the base communication, start modules etc.
    122         void start();
     134        void start(addressing2::EndpointSetPtr listen_on);
    123135
    124136        /// Stops the base communication, stop modules etc.
    125137        void stop();
    126 
    127         /// Sets the endpoints
    128         void setEndpoints( string& endpoints );
    129138
    130139        /// Check whether the base communication has been started up
     
    147156         * @return A sequence number for this message
    148157         */
    149         seqnum_t sendMessage(const LinkID lid, const Message* message);
     158        seqnum_t sendMessage(const LinkID& lid,
     159                reboost::message_t message,
     160                uint8_t priority,
     161                bool bypass_overlay = false) throw(communication_message_not_sent);
    150162
    151163        /**
     
    164176         * @return List of LinkID
    165177         */
    166         LinkIDs getLocalLinks(const address_v* addr) const;
     178//      LinkIDs getLocalLinks(const address_v* addr) const;  // XXX aktuell
    167179
    168180        /**
     
    187199
    188200        void unregisterEventListener(CommunicationEvents* _events);
    189 
    190         /// called when a system event is emitted by system queue
    191         virtual void handleSystemEvent(const SystemEvent& event);
    192201
    193202        /**
     
    196205         */
    197206        virtual void receive_message(transport_connection::sptr connection,
    198                 reboost::message_t msg);
    199 
     207                reboost::shared_buffer_t msg);
     208
     209    /**
     210     * called within the ASIO thread
     211     * when a connection is terminated (e.g. TCP close)
     212     */
     213    virtual void connection_terminated(transport_connection::sptr connection);
     214
     215    addressing2::EndpointPtr get_local_endpoint_of_link(const LinkID& linkid);
     216    addressing2::EndpointPtr get_remote_endpoint_of_link(const LinkID& linkid);
     217
     218       
    200219protected:
    201220
     
    205224         */
    206225        void receiveMessage(transport_connection::sptr connection,
    207                 reboost::message_t msg);
     226                reboost::shared_buffer_t message);
     227       
     228    /**
     229     * called within the ARIBA thread (System Queue)
     230     * when a connection is terminated (e.g. TCP close)
     231     */
     232    void connectionTerminated(transport_connection::sptr connection);
     233       
    208234
    209235        /// called when a network interface change happens
     
    221247                /// default constructor
    222248                LinkDescriptor() :
    223                         localLink(LinkID::UNSPECIFIED), localLocator(NULL),
    224                         remoteLink(LinkID::UNSPECIFIED), remoteLocator(NULL),
     249                        localLink(LinkID::UNSPECIFIED),
     250                        remoteLink(LinkID::UNSPECIFIED),
    225251                        up(false) {
    226252                }
    227253
    228                 ~LinkDescriptor() {
    229                         if (localLocator!=NULL)  delete localLocator;
    230                         if (remoteLocator!=NULL) delete remoteLocator;
     254                ~LinkDescriptor()
     255                {
     256                        if ( connection )
     257                        {
     258                            connection->unregister_communication_link(&localLink);
     259                        }
    231260                }
    232261
     
    240269                        return *unspec;
    241270                }
     271               
     272               
     273                transport_connection::sptr get_connection() const
     274                {
     275                    return connection;
     276                }
     277               
     278                void set_connection(const transport_connection::sptr& conn)
     279                {
     280                    // unregister from old connection,
     281                    // if any (but normally there shouldn't..)
     282                    if ( connection )
     283                    {
     284                        connection->unregister_communication_link(&localLink);
     285                    }
     286
     287                    // * set_connection *
     288                    connection = conn;
     289                   
     290                    // register this link with the connection
     291                    conn->register_communication_link(&localLink);
     292                }
    242293
    243294                bool unspecified;
     
    245296                /// link identifiers
    246297                LinkID localLink;
    247                 const address_v* localLocator;
     298                addressing2::EndpointPtr localLocator;
    248299
    249300                /// used underlay addresses for the link
    250301                LinkID remoteLink;
    251                 const address_v* remoteLocator;
     302                addressing2::EndpointPtr remoteLocator;
    252303
    253304                /// the remote end-point descriptor
    254                 EndpointDescriptor remoteEndpoint;
     305                EndpointDescriptor remoteDescriptor;
    255306
    256307                /// flag, whether this link is up
    257308                bool up;
    258309               
     310               
     311        private:
    259312                /// connection if link is up
    260313                transport_connection::sptr connection;
     
    281334        /// The local end-point descriptor
    282335        EndpointDescriptor localDescriptor;
    283 
    284 #ifndef UNDERLAY_OMNET
     336       
     337        /**
     338         * endpoint_set holding the addresses of the "server"-sockets,
     339         * ---> that should be opened
     340         *
     341         * (e.g. 0.0.0.0:41322)
     342         */
     343        addressing2::EndpointSetPtr listenOn_endpoints;
     344       
     345    /**
     346     * endpoint_set holding the addresses of the "server"-sockets,
     347     * ---> that are actually open
     348     *
     349     * (e.g. 0.0.0.0:41322)
     350     *
     351     * XXX should only be in transport_peer
     352     */
     353        addressing2::EndpointSetPtr active_listenOn_endpoints;
     354       
     355    /**
     356     * endpoint_set holding the addresses of the "server"-sockets,
     357     * ---> here the discovered "addressable" addresses are stored
     358     *
     359     * (e.g. 192.168.0.5:41322)
     360     *
     361     * XXX should only be in localDescriptor
     362     */
     363        addressing2::EndpointSetPtr local_endpoints;
     364
    285365        /// network change detector
    286366        NetworkChangeDetection networkMonitor;
    287 #endif
    288367
    289368        /// list of all remote addresses of links to end-points
    290         class endpoint_reference {
    291         public:
    292                 int count; ///< the number of open links to this end-point
    293                 const address_v* endpoint; ///< the end-point itself
    294         };
    295         vector<endpoint_reference> remote_endpoints;
    296 
    297         /// adds an end-point to the list
    298         void add_endpoint( const address_v* endpoint );
    299 
    300         /// removes an end-point from the list
    301         void remove_endpoint( const address_v* endpoint );
     369        // XXX DEPRECATED
     370//      class endpoint_reference {
     371//      public:
     372//              int count; ///< the number of open links to this end-point
     373//              const address_v* endpoint; ///< the end-point itself
     374//      };
     375//      vector<endpoint_reference> remote_endpoints;
     376
     377        // XXX DEPRECATED
     378//      /// adds an end-point to the list
     379//      void add_endpoint( const address_v* endpoint );
     380//
     381//      /// removes an end-point from the list
     382//      void remove_endpoint( const address_v* endpoint );
    302383
    303384        /// event listener
     
    314395        MessageReceiver* messageReceiver;
    315396
    316         /// convenience: send message to peer
    317         void send( Message* message, const EndpointDescriptor& endpoint );
    318         void send( Message* message, const LinkDescriptor& descriptor );
     397       
     398        /*
     399         * Sends a message over an existing link.
     400         *   ---> Adds »Link Message« Header
     401         */
     402        bool send_over_link(
     403                const uint8_t type,
     404                reboost::message_t message,
     405                const LinkDescriptor& desc,
     406                const uint8_t priority);
     407
     408    /*
     409     * Sends a message to a known peer. (To all known endpoints.)
     410     *   ---> Adds »Peer Message« Header
     411     */
     412        void send_to_peer(
     413                const uint8_t type,
     414                const PeerID& peer_id,
     415                reboost::message_t message,
     416                const EndpointDescriptor& endpoint,
     417                const uint8_t priority );
     418       
    319419
    320420        /// state of the base communication
  • source/ariba/communication/CommunicationEvents.cpp

    r5284 r12060  
    4949
    5050bool CommunicationEvents::onLinkRequest(const LinkID& id,
    51         const address_v* local, const address_v* remote) {
     51        const addressing2::EndpointPtr local,
     52        const addressing2::EndpointPtr remote)
     53{
    5254        return true;
    5355}
    5456
    55 void CommunicationEvents::onLinkUp(const LinkID& id, const address_v* local,
    56         const address_v* remote) {
     57void CommunicationEvents::onLinkUp(const LinkID& id,
     58        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
     59{
    5760}
    5861
    59 void CommunicationEvents::onLinkDown(const LinkID& id, const address_v* local,
    60         const address_v* remote) {
     62void CommunicationEvents::onLinkDown(const LinkID& id,
     63        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
     64{
    6165}
    6266
    6367void CommunicationEvents::onLinkChanged(const LinkID& id,
    64         const address_v* oldlocal, const address_v* newlocal,
    65         const address_v* oldremote, const address_v* newremote) {
     68        const addressing2::EndpointPtr oldlocal,  const addressing2::EndpointPtr newlocal,
     69        const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote)
     70{
    6671}
    6772
    68 void CommunicationEvents::onLinkFail(const LinkID& id, const address_v* local,
    69         const address_v* remote) {
     73void CommunicationEvents::onLinkFail(const LinkID& id,
     74        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
     75{
    7076}
    7177
    7278void CommunicationEvents::onLinkQoSChanged(const LinkID& id,
    73         const address_v* local, const address_v* remote, const QoSParameterSet& qos) {
     79        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
     80        const QoSParameterSet& qos)
     81{
    7482}
    7583
  • source/ariba/communication/CommunicationEvents.h

    r5284 r12060  
    4242#include "ariba/utility/types/LinkID.h"
    4343#include "ariba/utility/types/QoSParameterSet.h"
    44 #include "ariba/utility/addressing/addressing.hpp"
     44#include "ariba/utility/addressing2/endpoint.hpp"
    4545
    4646namespace ariba {
     
    4949using ariba::utility::LinkID;
    5050using ariba::utility::QoSParameterSet;
    51 using namespace ariba::addressing;
    5251
    5352class CommunicationEvents {
     
    6867         * @return True, if the link should be established
    6968         */
    70         virtual bool onLinkRequest(const LinkID& id, const address_v* local,
    71                 const address_v* remote);
     69        virtual bool onLinkRequest(const LinkID& id,
     70                const addressing2::EndpointPtr local,
     71                const addressing2::EndpointPtr remote);
    7272
    7373        /**
     
    7777         * @param id The link id of the established link
    7878         */
    79         virtual void onLinkUp(const LinkID& id, const address_v* local,
    80                 const address_v* remote);
     79        virtual void onLinkUp(const LinkID& id,
     80                const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote);
    8181
    8282        /**
     
    8585         * @param id The link identifier of the dropped link
    8686         */
    87         virtual void onLinkDown(const LinkID& id, const address_v* local,
    88                 const address_v* remote);
     87        virtual void onLinkDown(const LinkID& id,
     88                const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote);
    8989
    9090        /**
     
    9797         */
    9898        virtual void onLinkChanged(const LinkID& id,
    99                 const address_v* oldlocal,  const address_v* newlocal,
    100                 const address_v* oldremote, const address_v* newremote
    101         );
     99                const addressing2::EndpointPtr oldlocal,  const addressing2::EndpointPtr newlocal,
     100                const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote);
    102101
    103         virtual void onLinkFail(const LinkID& id, const address_v* local,
    104                 const address_v* remote);
     102        virtual void onLinkFail(const LinkID& id,
     103                const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote);
    105104
    106         virtual void onLinkQoSChanged(const LinkID& id, const address_v* local,
    107                 const address_v* remote, const QoSParameterSet& qos);
     105        virtual void onLinkQoSChanged(const LinkID& id,
     106                const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
     107                const QoSParameterSet& qos);
    108108};
    109109
  • source/ariba/communication/EndpointDescriptor.cpp

    r5624 r12060  
    4242namespace communication {
    4343
    44 vsznDefault(EndpointDescriptor);
     44//vsznDefault(EndpointDescriptor);
    4545
    46 EndpointDescriptor::EndpointDescriptor(){
     46EndpointDescriptor::EndpointDescriptor()  :
     47        endpoints(new addressing2::endpoint_set())
     48{
    4749}
    4850
     
    5557}
    5658
    57 EndpointDescriptor::EndpointDescriptor(const endpoint_set& endpoints ) :
    58         endpoints(endpoints){
     59EndpointDescriptor::EndpointDescriptor(
     60        const PeerID& peer_id,
     61        addressing2::EndpointSetPtr endpoints ) :
     62    peerId(peer_id),
     63        endpoints(endpoints)
     64{
    5965}
    6066
    61 EndpointDescriptor::EndpointDescriptor(const string& str) : endpoints(str){
     67EndpointDescriptor::EndpointDescriptor(const string& str)  :
     68        endpoints(new addressing2::endpoint_set())
     69{
     70    cout << "ERROR: Construction of EndpointDescriptor from String is not functional!!" << endl;
     71    assert( false );
    6272}
    6373
     74
     75reboost::message_t EndpointDescriptor::serialize() const
     76{
     77    reboost::message_t msg;
     78    msg.push_back(peerId.serialize());
     79    msg.push_back(endpoints->serialize());
     80   
     81    return msg;
     82}
     83
     84reboost::shared_buffer_t EndpointDescriptor::deserialize(reboost::shared_buffer_t buff)
     85{
     86    buff = peerId.deserialize(buff);
     87    buff = endpoints->deserialize(buff);
     88   
     89    return buff;
     90}
     91
     92
    6493}} // namespace ariba, communication
  • source/ariba/communication/EndpointDescriptor.h

    r7744 r12060  
    4242#include <string>
    4343#include <set>
    44 #include "ariba/utility/serialization.h"
     44//#include "ariba/utility/serialization.h"
    4545#include "ariba/utility/types/PeerID.h"
    46 #include "ariba/utility/addressing/endpoint_set.hpp"
     46
     47#include "ariba/utility/addressing2/endpoint_set.hpp"
     48
     49// reboost messages
     50#include "ariba/utility/transport/messages/message.hpp"
     51
    4752
    4853namespace ariba {
     
    5156using_serialization;
    5257using namespace std;
    53 using namespace ariba::addressing;
    5458using ariba::utility::PeerID;
    5559
    56 class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE
    57         friend class BaseCommunication;
     60
     61/**
     62 * This class is used a transitions helper between the old addressing and
     63 * serialization to the new addressing2 and the new message classes
     64 *
     65 * Maybe it will be replaced, or at least modified in the future.
     66 */
     67//class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE
     68//    friend class BaseCommunication;
     69class EndpointDescriptor
     70{
     71    friend class BaseCommunication;
    5872
    5973public:
     
    6882
    6983        /// construct end-points from an endpoint set
    70         EndpointDescriptor(const endpoint_set& endpoints );
     84        EndpointDescriptor(const PeerID& peer_id, addressing2::EndpointSetPtr endpoints );
    7185
     86        // FIXME NOT WORKING !!
    7287        /// construct end-points from a string
    7388        EndpointDescriptor(const string& str);
     
    7590        /// convert end-points to string
    7691        string toString() const {
    77                 return endpoints.to_string();
     92                return endpoints->to_string();
    7893        }
    7994
     
    90105        }
    91106
    92         /// create endpoint
    93         static EndpointDescriptor* fromString(string str) {
    94                 return new EndpointDescriptor(str);
    95         }
     107//      /// create endpoint
     108//      static EndpointDescriptor* fromString(string str) {
     109//              return new EndpointDescriptor(str);
     110//      }
    96111
    97112        bool operator==(const EndpointDescriptor& rh) const {
     
    113128
    114129        /// returns the end-points of this descriptor
    115         endpoint_set& getEndpoints() {
     130        addressing2::const_EndpointSetPtr getEndpoints() const {
    116131                return endpoints;
    117132        }
    118 
    119         /// returns the end-points of this descriptor
    120         const endpoint_set& getEndpoints() const {
    121                 return endpoints;
     133       
     134        void replace_endpoint_set(addressing2::EndpointSetPtr new_endpoints)
     135        {
     136            endpoints = new_endpoints;
    122137        }
    123 
     138       
    124139        /// returns a reference to the peer id
    125140        PeerID& getPeerId() {
     
    132147                return peerId;
    133148        }
     149       
     150        /// returns a message with peerId and endpoints in it
     151        reboost::message_t serialize() const;
     152       
     153        /// deserialite peerId and endpoints
     154        reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff);
     155       
    134156private:
    135         endpoint_set endpoints;
     157        addressing2::EndpointSetPtr endpoints;
    136158        PeerID peerId;
    137159};
     
    139161}} // namespace ariba, communication
    140162
    141 sznBeginDefault( ariba::communication::EndpointDescriptor, X ){
    142 
    143         // serialize peer id
    144         X && &peerId;
    145 
    146         // serialize end-points
    147         uint16_t len = endpoints.to_bytes_size();
    148         X && len;
    149         uint8_t* buffer = X.bytes( len );
    150         if (buffer!=NULL) {
    151                 if (X.isDeserializer()) endpoints.assign(buffer,len);
    152                 else endpoints.to_bytes(buffer);
    153         }
    154 }sznEnd();
     163//sznBeginDefault( ariba::communication::EndpointDescriptor, X ){
     164//
     165//    // TODO
     166//    assert(false);
     167//   
     168//      // serialize peer id
     169//      X && &peerId;
     170//
     171//      // serialize end-points
     172//      uint16_t len = endpoints.to_bytes_size();
     173//      X && len;
     174//      uint8_t* buffer = X.bytes( len );
     175//      if (buffer!=NULL) {
     176//              if (X.isDeserializer()) endpoints.assign(buffer,len);
     177//              else endpoints.to_bytes(buffer);
     178//      }
     179//}sznEnd();
    155180
    156181#endif /*ENDPOINTDESCRIPTOR_H_*/
  • source/ariba/communication/messages/AribaBaseMsg.h

    r9694 r12060  
    4242#include <string>
    4343#include <boost/cstdint.hpp>
    44 #include "ariba/utility/messages.h"
     44//#include "ariba/utility/messages.h"
     45#include "ariba/utility/messages/Message.h"
    4546#include "ariba/utility/serialization.h"
    4647#include "ariba/utility/types/LinkID.h"
     
    6162using_serialization;
    6263
     64// XXX This whole message is DEPRECATED
    6365class AribaBaseMsg : public Message {
    6466        VSERIALIZEABLE;
     
    6971                typeLinkReply = 2,
    7072                typeLinkClose = 3,
    71                 typeLinkUpdate = 4
     73                typeLinkUpdate = 4,
     74                typeDirectData = 5
    7275        };
    7376
     
    115118
    116119sznBeginDefault( ariba::communication::AribaBaseMsg, X ) {
    117         X && type && &localLink && &remoteLink;
     120        X && type && &remoteLink;
    118121        if (type == typeLinkReply || type == typeLinkRequest)
    119                 X && localDescriptor && remoteDescriptor;
    120         X && Payload();
     122                X && &localLink && localDescriptor && remoteDescriptor;
     123//      X && Payload();
    121124} sznEnd();
    122125
  • source/ariba/communication/messages/CMakeLists.txt

    r10700 r12060  
    3737# [License]
    3838
    39 add_headers(AribaBaseMsg.h)
     39#add_headers(AribaBaseMsg.h)
    4040
    41 add_sources(AribaBaseMsg.cpp)
     41#add_sources(AribaBaseMsg.cpp)
  • source/ariba/communication/networkinfo/AddressDiscovery.cpp

    r10700 r12060  
    4949#include <ifaddrs.h>
    5050
     51#include <string>
     52#include <boost/asio/ip/address.hpp>
     53#include <boost/foreach.hpp>
     54
     55#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
     56#include "ariba/utility/addressing/mac_address.hpp"
     57
    5158#ifdef HAVE_LIBBLUETOOTH
    5259  #include <bluetooth/bluetooth.h>
     
    5865namespace communication {
    5966
    60 mac_address AddressDiscovery::getMacFromIF( const char* name ) {
     67
     68using namespace std;
     69using namespace addressing2;
     70using namespace boost::asio::ip;
     71
     72using ariba::addressing::mac_address;
     73
     74mac_address getMacFromIF( const char* name )
     75{
    6176        mac_address addr;
    6277#ifdef HAVE_LIBBLUETOOTH
     
    7388}
    7489
    75 int AddressDiscovery::dev_info(int s, int dev_id, long arg) {
    76 #ifdef HAVE_LIBBLUETOOTH
    77         endpoint_set* set = (endpoint_set*)arg;
    78         struct hci_dev_info di;
    79         memset(&di, 0, sizeof(struct hci_dev_info));
    80         di.dev_id = dev_id;
    81         if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0;
    82         mac_address mac;
    83         mac.bluetooth( di.bdaddr );
    84         address_vf vf = mac;
    85         set->add(vf);
     90int dev_info(int s, int dev_id, long arg)
     91{
     92#ifdef HAVE_LIBBLUETOOTH
     93//      endpoint_set* set = (endpoint_set*)arg;
     94//      struct hci_dev_info di;
     95//      memset(&di, 0, sizeof(struct hci_dev_info));
     96//      di.dev_id = dev_id;
     97//      if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0;
     98//      mac_address mac;
     99//      mac.bluetooth( di.bdaddr );
     100//      address_vf vf = mac;
     101//      set->add(vf);
    86102#endif
    87103        return 0;
    88104}
    89105
    90 void AddressDiscovery::discover_bluetooth( endpoint_set& endpoints ) {
    91 #ifdef HAVE_LIBBLUETOOTH
    92         hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints );
    93 #endif
    94 }
    95 
    96 void AddressDiscovery::discover_ip_addresses( endpoint_set& endpoints ) {
    97         struct ifaddrs* ifaceBuffer = NULL;
    98         void*           tmpAddrPtr  = NULL;
    99 
    100         int ret = getifaddrs( &ifaceBuffer );
    101         if( ret != 0 ) return;
    102 
    103         for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
    104 
    105                 // ignore devices that are disabled or have no ip
    106                 if(i == NULL) continue;
    107                 struct sockaddr* addr = i->ifa_addr;
    108                 if (addr==NULL) continue;
    109 
    110                 // ignore tun devices
    111                 string device = string(i->ifa_name);
    112                 if(device.find_first_of("tun") == 0) continue;
    113 
    114                 if (addr->sa_family == AF_INET) {
    115                         // look for ipv4
    116                         char straddr[INET_ADDRSTRLEN];
    117                         tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr;
    118                         inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
    119                         ip_address ip = straddr;
    120                         if (ip.is_loopback()) continue;
    121                         address_vf vf = ip;
    122                         endpoints.add( vf );
    123                 } else
    124                 if (addr->sa_family == AF_INET6) {
    125                         // look for ipv6
    126                         char straddr[INET6_ADDRSTRLEN];
    127                         tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
    128                         inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
    129                         ip_address ip = straddr;
    130                         if (ip.is_loopback()) continue;
    131 //                      if (ip.is_link_local()) continue;
    132                         address_vf vf = ip;
    133                         endpoints.add( vf );
    134                 }
    135         }
    136 
    137         freeifaddrs(ifaceBuffer);
    138 }
    139 
    140 void AddressDiscovery::discover_endpoints( endpoint_set& endpoints ) {
    141         discover_ip_addresses( endpoints );
    142         discover_bluetooth( endpoints );
     106void discover_bluetooth(
     107        EndpointSetPtr listenOn_endpoints,
     108        EndpointSetPtr discovered_endpoints )
     109{
     110#ifdef HAVE_LIBBLUETOOTH
     111    // FIXME aktuell bluetooth
     112//      hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints );
     113#endif
     114}
     115
     116void discover_ip_addresses(
     117        EndpointSetPtr listenOn_endpoints,
     118        EndpointSetPtr discovered_endpoints )
     119{
     120    bool discover_ipv4 = false;
     121    bool discover_ipv6 = false;
     122    vector<uint16_t> ipv4_ports;
     123    vector<uint16_t> ipv6_ports;
     124   
     125    /* analyze listenOn_endpoints */
     126    BOOST_FOREACH( TcpIP_EndpointPtr endp, listenOn_endpoints->get_tcpip_endpoints() )
     127    {
     128        // BRANCH: IPv4 any [0.0.0.0]
     129        if ( endp->to_asio().address() == address_v4::any() )
     130        {
     131            // add port
     132            ipv4_ports.push_back(endp->to_asio().port());
     133           
     134            discover_ipv4 = true;
     135        }
     136
     137        // BRANCH: IPv6 any [::]
     138        else if ( endp->to_asio().address() == address_v6::any() )
     139        {
     140            // add port
     141            ipv6_ports.push_back(endp->to_asio().port());
     142           
     143            discover_ipv6 = true;
     144
     145           
     146            // NOTE: on linux the ipv6-any address [::] catches ipv4 as well
     147            ipv4_ports.push_back(endp->to_asio().port());
     148            discover_ipv4 = true;
     149        }
     150       
     151        // BRANCH: explicit ip address
     152        else
     153        {
     154            // ---> don't discover anything, just add it directly
     155            discovered_endpoints->add_endpoint(endp);
     156        }
     157    }
     158   
     159   
     160    /* discover addresses */
     161    if ( discover_ipv4 || discover_ipv6 )
     162    {
     163        struct ifaddrs* ifaceBuffer = NULL;
     164        void*           tmpAddrPtr  = NULL;
     165   
     166        int ret = getifaddrs( &ifaceBuffer );
     167        if( ret != 0 ) return;
     168   
     169        for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next )
     170        {
     171            // ignore devices that are disabled or have no ip
     172            if(i == NULL) continue;
     173            struct sockaddr* addr = i->ifa_addr;
     174            if (addr==NULL) continue;
     175   
     176    //          // ignore tun devices  // XXX why?
     177    //          string device = string(i->ifa_name);
     178    //          if(device.find_first_of("tun") == 0) continue;
     179
     180            // IPv4
     181            if ( discover_ipv4 && addr->sa_family == AF_INET )
     182            {
     183                char straddr[INET_ADDRSTRLEN];
     184                tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr;
     185                inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
     186               
     187                address ip_addr = address::from_string(straddr);
     188   
     189                // skip loopback address
     190                if ( ip_addr.to_v4() == address_v4::loopback() )
     191                    continue;
     192
     193                // add endpoint for this address and every given ipv4 port
     194                BOOST_FOREACH( uint16_t port, ipv4_ports )
     195                {
     196                    tcp::endpoint tcpip_endp(ip_addr, port);
     197                    TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp));
     198                   
     199                    discovered_endpoints->add_endpoint(endp);
     200                }
     201            }
     202           
     203            // IPv6
     204            else if ( discover_ipv6 && addr->sa_family == AF_INET6 )
     205            {
     206                // look for ipv6
     207                char straddr[INET6_ADDRSTRLEN];
     208                tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
     209                inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
     210               
     211                address ip_addr = address::from_string(straddr);
     212               
     213                // skip loopback address
     214                if ( ip_addr.to_v6() == address_v6::loopback() )
     215                    continue;
     216
     217                // add endpoint for this address and every given ipv4 port
     218                BOOST_FOREACH( uint16_t port, ipv6_ports )
     219                {
     220                    tcp::endpoint tcpip_endp(ip_addr, port);
     221                    TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp));
     222                   
     223                    discovered_endpoints->add_endpoint(endp);
     224                }
     225            }
     226        }
     227   
     228        freeifaddrs(ifaceBuffer);
     229    }
     230}
     231
     232
     233
     234EndpointSetPtr AddressDiscovery::discover_endpoints(EndpointSetPtr listenOn_endpoints)
     235{
     236    EndpointSetPtr discovered_endpoints(new addressing2::endpoint_set());
     237   
     238        discover_ip_addresses( listenOn_endpoints, discovered_endpoints );
     239        discover_bluetooth( listenOn_endpoints, discovered_endpoints );
     240       
     241        return discovered_endpoints;
    143242}
    144243
  • source/ariba/communication/networkinfo/AddressDiscovery.h

    r5639 r12060  
    4040#define __ADDRESS_DISCOVERY_H
    4141
    42 #include "ariba/utility/addressing/addressing.hpp"
    43 
    44 using namespace ariba::addressing;
     42#include "ariba/utility/addressing2/endpoint_set.hpp"
    4543
    4644namespace ariba {
    4745namespace communication {
    4846
     47using addressing2::EndpointSetPtr;
     48
    4949class AddressDiscovery {
    5050public:
    51         static void discover_endpoints( endpoint_set& endpoints );
     51        static EndpointSetPtr discover_endpoints(EndpointSetPtr listenOn_endpoints);
    5252
    5353private:
    54         static mac_address getMacFromIF( const char* name );
    55         static int dev_info(int s, int dev_id, long arg);
    56         static void discover_bluetooth( endpoint_set& endpoints );
    57         static void discover_ip_addresses( endpoint_set& endpoints );
     54        // TODO aktuell weg damit..
     55//      static mac_address getMacFromIF( const char* name );
     56//      static int dev_info(int s, int dev_id, long arg);
     57//      static void discover_bluetooth( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints );
     58//      static void discover_ip_addresses( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints );
    5859};
    5960
  • source/ariba/overlay/BaseOverlay.cpp

    r10653 r12060  
    5757#include "ariba/utility/visual/DddVis.h"
    5858#include "ariba/utility/visual/ServerVis.h"
     59#include <ariba/utility/misc/sha1.h>
    5960
    6061namespace ariba {
    6162namespace overlay {
     63
     64using namespace std;
     65using ariba::transport::system_priority;
    6266
    6367#define visualInstance          ariba::utility::DddVis::instance()
    6468#define visualIdOverlay         ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
    6569#define visualIdBase            ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
     70
     71
     72// time constants (in seconds)
     73#define KEEP_ALIVE_TIME         60                      // send keep-alive message after link is not used for #s 
     74
     75#define LINK_ESTABLISH_TIME_OUT 10                      // timeout: link requested but not up
     76#define KEEP_ALIVE_TIME_OUT     KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT     // timeout: no data received on this link (incl. keep-alive messages)
     77#define AUTO_LINK_TIME_OUT      KEEP_ALIVE_TIME_OUT                    // timeout: auto link not used for #s
    6678
    6779
     
    8597
    8698LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
    87         BOOST_FOREACH( LinkDescriptor* lp, links )
     99        foreach( LinkDescriptor* lp, links )
    88100                                if ((communication ? lp->communicationId : lp->overlayId) == link)
    89101                                        return lp;
     
    92104
    93105const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
    94         BOOST_FOREACH( const LinkDescriptor* lp, links )
     106        foreach( const LinkDescriptor* lp, links )
    95107                                if ((communication ? lp->communicationId : lp->overlayId) == link)
    96108                                        return lp;
     
    122134
    123135/// returns a auto-link descriptor
    124 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
     136LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service )
     137{
    125138        // search for a descriptor that is already up
    126         BOOST_FOREACH( LinkDescriptor* lp, links )
    127                                 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
    128                                         return lp;
     139        foreach( LinkDescriptor* lp, links )
     140    {
     141        if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) )
     142            return lp;
     143    }
     144       
    129145        // if not found, search for one that is about to come up...
    130         BOOST_FOREACH( LinkDescriptor* lp, links )
    131         if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
    132                 return lp;
     146        foreach( LinkDescriptor* lp, links )
     147        {
     148            time_t now = time(NULL);
     149           
     150        if (lp->autolink && lp->remoteNode == node && lp->service == service
     151                && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT )
     152            return lp;
     153        }
     154       
    133155        return NULL;
    134156}
     
    136158/// stabilizes link information
    137159void BaseOverlay::stabilizeLinks() {
    138         // send keep-alive messages over established links
    139         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     160    time_t now = time(NULL);
     161   
     162    // send keep-alive messages over established links
     163        foreach( LinkDescriptor* ld, links )
     164    {
    140165                if (!ld->up) continue;
    141                 OverlayMsg msg( OverlayMsg::typeLinkAlive,
    142                                 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
    143                 if (ld->relayed) msg.setRouteRecord(true);
    144                 send_link( &msg, ld->overlayId );
     166               
     167                if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME )
     168                {
     169                    logging_debug("[BaseOverlay] Sending KeepAlive over "
     170                            << ld->to_string()
     171                            << " after "
     172                            << difftime( now, ld->keepAliveSent )
     173                            << "s");
     174                   
     175            OverlayMsg msg( OverlayMsg::typeKeepAlive,
     176                    OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
     177            msg.setRouteRecord(true);
     178            ld->keepAliveSent = now;
     179            send_link( &msg, ld->overlayId, system_priority::OVERLAY );
     180                }
    145181        }
    146182
    147183        // iterate over all links and check for time boundaries
    148184        vector<LinkDescriptor*> oldlinks;
    149         time_t now = time(NULL);
    150         BOOST_FOREACH( LinkDescriptor* ld, links ) {
    151 
    152                 // keep alives and not up? yes-> link connection request is stale!
    153                 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
    154 
    155                         // increase counter
    156                         ld->keepAliveMissed++;
    157 
    158                         // missed more than four keep-alive messages (10 sec)? -> drop link
    159                         if (ld->keepAliveMissed > 4) {
    160                                 logging_info( "Link connection request is stale, closing: " << ld );
    161                                 oldlinks.push_back( ld );
    162                                 continue;
    163                         }
     185        foreach( LinkDescriptor* ld, links ) {
     186
     187                // link connection request stale?
     188                if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT )  // NOTE: keepAliveReceived == now, on connection request
     189                {
     190            logging_info( "Link connection request is stale, closing: " << ld );
     191            ld->failed = true;
     192            oldlinks.push_back( ld );
     193            continue;
    164194                }
    165195
    166196                if (!ld->up) continue;
    167197
     198               
     199               
     200               
    168201                // check if link is relayed and retry connecting directly
     202                // TODO Mario: What happens here?  --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply
    169203                if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) {
    170204                        ld->retryCounter--;
     
    173207
    174208                // remote used as relay flag
    175                 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
     209                if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT)  // TODO is this a reasonable timeout ??
    176210                        ld->relaying = false;
    177211
     
    183217
    184218                // auto-link time exceeded?
    185                 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
     219                if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) {
    186220                        oldlinks.push_back( ld );
    187221                        continue;
     
    189223
    190224                // keep alives missed? yes->
    191                 if ( difftime( now, ld->keepAliveTime ) > 4 ) {
    192 
    193                         // increase counter
    194                         ld->keepAliveMissed++;
    195 
    196                         // missed more than four keep-alive messages (4 sec)? -> drop link
    197                         if (ld->keepAliveMissed >= 2) {
    198                                 logging_info( "Link is stale, closing: " << ld );
    199                                 oldlinks.push_back( ld );
    200                                 continue;
    201                         }
     225                if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT )
     226                {
     227            logging_info( "Link is stale, closing: " << ld );
     228            ld->failed = true;
     229            oldlinks.push_back( ld );
     230            continue;
    202231                }
    203232        }
    204233
    205234        // drop links
    206         BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
     235        foreach( LinkDescriptor* ld, oldlinks ) {
    207236                logging_info( "Link timed out. Dropping " << ld );
    208237                ld->relaying = false;
     
    210239        }
    211240
    212         // show link state
    213         counter++;
    214         if (counter>=4) showLinks();
    215         if (counter>=4 || counter<0) counter = 0;
     241       
     242       
     243       
     244        // show link state  (debug output)
     245        if (counter>=10 || counter<0)
     246    {
     247            showLinks();
     248            counter = 0;
     249    }
     250        else
     251        {
     252            counter++;
     253        }
    216254}
    217255
     
    230268
    231269                int i=0;
    232                 BOOST_FOREACH( LinkDescriptor* ld, links ) {
    233                         if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
     270                foreach( LinkDescriptor* ld, links ) {
     271                        if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
    234272                        bool found = false;
    235                         BOOST_FOREACH(NodeID& id, nodes)
     273                        foreach(NodeID& id, nodes)
    236274                        if (id  == ld->remoteNode) found = true;
    237275                        if (found) continue;
     
    261299        int i=0;
    262300        logging_info("--- link state -------------------------------");
    263         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     301        foreach( LinkDescriptor* ld, links ) {
    264302                string epd = "";
    265                 if (ld->isDirectVital())
    266                         epd = getEndpointDescriptor(ld->remoteNode).toString();
     303                if (isLinkDirectVital(ld))
     304                {
     305//                      epd = getEndpointDescriptor(ld->remoteNode).toString();
     306                   
     307                    epd = "Connection: ";
     308                    epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string();
     309                    epd += " <---> ";
     310                    epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string();
     311                }
    267312
    268313                logging_info("LINK_STATE: " << i << ": " << ld << " " << epd);
     
    289334// internal message delivery ---------------------------------------------------
    290335
     336
     337seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority )
     338{
     339    // set priority
     340    message->setPriority(priority);
     341   
     342    // wrap old OverlayMsg into reboost message
     343    reboost::message_t wrapped_message = message->wrap_up_for_sending();
     344   
     345    // send down to BaseCommunication
     346    try
     347    {
     348        // * send *
     349        return bc->sendMessage(bc_link, wrapped_message, priority, false);
     350    }
     351    catch ( communication::communication_message_not_sent& e )
     352    {
     353        ostringstream out;
     354        out << "Communication message not sent: " << e.what();
     355        throw message_not_sent(out.str());
     356    }
     357   
     358    throw logic_error("This should never happen!");
     359}
     360
     361
    291362/// routes a message to its destination node
    292 void BaseOverlay::route( OverlayMsg* message ) {
    293 
     363void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop )
     364{
    294365        // exceeded time-to-live? yes-> drop message
    295         if (message->getNumHops() > message->getTimeToLive()) {
    296                 logging_warn("Message exceeded TTL. Dropping message and relay routes"
    297                                 "for recovery.");
     366        if (message->getNumHops() > message->getTimeToLive())
     367        {
     368                logging_warn("Message exceeded TTL. Dropping message and relay routes "
     369            << "for recovery. Hop count: " << (int) message->getNumHops());
    298370                removeRelayNode(message->getDestinationNode());
    299371                return;
     
    301373
    302374        // no-> forward message
    303         else {
     375        else
     376        {
    304377                // destinastion myself? yes-> handle message
    305                 if (message->getDestinationNode() == nodeId) {
    306                         logging_warn("Usually I should not route messages to myself!");
    307                         Message msg;
    308                         msg.encapsulate(message);
    309                         handleMessage( &msg, NULL );
    310                 } else {
    311                         // no->send message to next hop
    312                         send( message, message->getDestinationNode() );
    313                 }
    314         }
     378                if (message->getDestinationNode() == nodeId)
     379                {
     380                        logging_warn("Usually I should not route messages to myself. And I won't!");
     381                }
     382
     383                // no->send message to next hop
     384                else
     385                {
     386                    try
     387                    {
     388                /* (deep) packet inspection to determine priority */
     389                // BRANCH: typeData  -->  send with low priority
     390                if ( message->getType() == OverlayMsg::typeData )
     391                {
     392                    // TODO think about implementing explicit routing queue (with active queue management??)
     393                    send( message,
     394                          message->getDestinationNode(),
     395                          message->getPriority(),
     396                          last_hop );
     397                }
     398                // BRANCH: internal message  -->  send with higher priority
     399                else
     400                {
     401                    send( message,
     402                          message->getDestinationNode(),
     403                          system_priority::HIGH,
     404                          last_hop );
     405                }
     406                    }
     407                    catch ( message_not_sent& e )
     408                    {
     409                        logging_warn("Unable to route message of type "
     410                                << message->getType()
     411                                << " to "
     412                                << message->getDestinationNode()
     413                                << ". Reason: "
     414                                << e.what());
     415                       
     416                        // inform sender
     417                if ( message->getType() != OverlayMsg::typeMessageLost )
     418                {
     419                    report_lost_message(message);
     420                }
     421                    }
     422                }
     423        }
     424}
     425
     426void BaseOverlay::report_lost_message( const OverlayMsg* message )
     427{
     428    OverlayMsg reply(OverlayMsg::typeMessageLost);
     429    reply.setSeqNum(message->getSeqNum());
     430   
     431    /**
     432     * MessageLost-Message
     433     *
     434     * - Type of lost message
     435     * - Hop count of lost message
     436     * - Source-LinkID  of lost message
     437     */
     438    reboost::shared_buffer_t b(sizeof(uint8_t)*2);
     439    b.mutable_data()[0] = message->getType();
     440    b.mutable_data()[1] = message->getNumHops();
     441    reply.append_buffer(b);
     442    reply.append_buffer(message->getSourceLink().serialize());
     443   
     444    try
     445    {
     446        send_node(&reply, message->getSourceNode(),
     447                system_priority::OVERLAY,
     448                OverlayInterface::OVERLAY_SERVICE_ID);
     449    }
     450    catch ( message_not_sent& e )
     451    {
     452        logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too.");
     453    }
    315454}
    316455
    317456/// sends a message to another node, delivers it to the base overlay class
    318 seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
     457seqnum_t BaseOverlay::send( OverlayMsg* message,
     458        const NodeID& destination,
     459        uint8_t priority,
     460        const NodeID& last_hop ) throw(message_not_sent)
     461{
    319462        LinkDescriptor* next_link = NULL;
    320463
    321464        // drop messages to unspecified destinations
    322         if (destination.isUnspecified()) return -1;
    323 
    324         // send messages to myself -> handle message and drop warning!
    325         if (destination == nodeId) {
    326                 logging_warn("Sent message to myself. Handling message.")
    327                 Message msg;
    328                 msg.encapsulate(message);
    329                 handleMessage( &msg, NULL );
    330                 return -1;
     465        if (destination.isUnspecified())
     466            throw message_not_sent("No destination specified. Drop!");
     467
     468        // send messages to myself -> drop!
     469    // TODO maybe this is not what we want. why not just deliver this message?
     470    //   There is a similar check in the route function, there it should be okay.
     471        if (destination == nodeId)
     472        {
     473            logging_warn("Sent message to myself. Drop!");
     474           
     475            throw message_not_sent("Sent message to myself. Drop!");
    331476        }
    332477
    333478        // use relay path?
    334         if (message->isRelayed()) {
     479        if (message->isRelayed())
     480        {
    335481                next_link = getRelayLinkTo( destination );
    336                 if (next_link != NULL) {
     482               
     483                if (next_link != NULL)
     484                {
    337485                        next_link->setRelaying();
    338                         return bc->sendMessage(next_link->communicationId, message);
    339                 } else {
    340                         logging_warn("Could not send message. No relay hop found to "
    341                                         << destination << " -- trying to route over overlay paths ...")
    342 //                      logging_error("ERROR: " << debugInformation() );
    343                 //                      return -1;
    344                 }
    345         }
    346 
     486
     487                        // * send message over relayed link *
     488                        return send_overlaymessage_down(message, next_link->communicationId, priority);
     489                }
     490                else
     491                {
     492                        logging_warn("No relay hop found to " << destination
     493                                << " -- trying to route over overlay paths ...")
     494                }
     495        }
     496
     497       
    347498        // last resort -> route over overlay path
    348499        LinkID next_id = overlayInterface->getNextLinkId( destination );
    349         if (next_id.isUnspecified()) {
    350                 logging_warn("Could not send message. No next hop found to " <<
    351                                 destination );
    352                 logging_error("ERROR: " << debugInformation() );
    353                 return -1;
    354         }
    355 
    356         // get link descriptor, up and running? yes-> send message
     500        if ( next_id.isUnspecified() )
     501        {               
     502        // apperently we're the closest node --> try second best node
     503        //   NOTE: This is helpful if our routing table is not up-to-date, but
     504        //   may lead to circles. So we have to be careful.
     505        std::vector<const LinkID*> next_ids =
     506            overlayInterface->getSortedLinkIdsTowardsNode( destination );
     507           
     508        for ( int i = 0; i < next_ids.size(); i++ )
     509        {
     510            const LinkID& link = *next_ids[i];
     511           
     512            if ( ! link.isUnspecified() )
     513            {
     514                next_id = link;
     515               
     516                break;
     517            }
     518        }
     519     
     520        // still no next hop found. drop.
     521        if ( next_id.isUnspecified() )
     522        {
     523            logging_warn("Could not send message. No next hop found to " <<
     524                destination );
     525            logging_error("ERROR: " << debugInformation() );
     526           
     527            throw message_not_sent("No next hop found.");
     528        }
     529        }
     530
     531       
     532        /* get link descriptor, do some checks and send message */
    357533        next_link = getDescriptor(next_id);
    358         if (next_link != NULL && next_link->up) {
    359                 // send message over relayed link
    360                 return send(message, next_link);
    361         }
    362 
    363         // no-> error, dropping message
    364         else {
    365                 logging_warn("Could not send message. Link not known or up");
    366                 logging_error("ERROR: " << debugInformation() );
    367                 return -1;
    368         }
    369 
    370         // not reached-> fail
    371         return -1;
    372 }
     534   
     535    // check pointer
     536    if ( next_link == NULL )
     537    {
     538        // NOTE: this shuldn't happen
     539        throw message_not_sent("Could not send message. Link not known.");
     540    }
     541   
     542    // avoid circles
     543    if ( next_link->remoteNode == last_hop )
     544    {
     545        // XXX logging_debug
     546        logging_info("Possible next hop would create a circle: "
     547            << next_link->remoteNode);
     548       
     549        throw message_not_sent("Could not send message. Possible next hop would create a circle.");
     550    }
     551   
     552    // check if link is up
     553        if ( ! next_link->up)
     554        {
     555        logging_warn("Could not send message. Link not up");
     556        logging_error("ERROR: " << debugInformation() );
     557       
     558        throw message_not_sent("Could not send message. Link not up");
     559        }
     560
     561        // * send message over overlay link *
     562        return send(message, next_link, priority);
     563}
     564
    373565
    374566/// send a message using a link descriptor, delivers it to the base overlay class
    375 seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
     567seqnum_t BaseOverlay::send( OverlayMsg* message,
     568        LinkDescriptor* ldr,
     569        uint8_t priority ) throw(message_not_sent)
     570{
    376571        // check if null
    377         if (ldr == NULL) {
    378                 logging_error("Can not send message to " << message->getDestinationAddress());
    379                 return -1;
     572        if (ldr == NULL)
     573        {
     574        ostringstream out;
     575        out << "Can not send message to " << message->getDestinationAddress();
     576        throw message_not_sent(out.str());
    380577        }
    381578
    382579        // check if up
    383         if (!ldr->up && !ignore_down) {
    384                 logging_error("Can not send message. Link not up:" << ldr );
     580        if ( !ldr->up )
     581        {
    385582                logging_error("DEBUG_INFO: " << debugInformation() );
    386                 return -1;
    387         }
    388         LinkDescriptor* ld = NULL;
    389 
    390         // handle relayed link
    391         if (ldr->relayed) {
     583
     584        ostringstream out;
     585        out << "Can not send message. Link not up:" << ldr->to_string();
     586        throw message_not_sent(out.str());
     587        }
     588       
     589        LinkDescriptor* next_hop_ld = NULL;
     590
     591        // BRANCH: relayed link
     592        if (ldr->relayed)
     593        {
    392594                logging_debug("Resolving direct link for relayed link to "
    393595                                << ldr->remoteNode);
    394                 ld = getRelayLinkTo( ldr->remoteNode );
    395                 if (ld==NULL) {
    396                         logging_error("No relay path found to link " << ldr );
     596               
     597                next_hop_ld = getRelayLinkTo( ldr->remoteNode );
     598               
     599                if (next_hop_ld==NULL)
     600                {
    397601                        logging_error("DEBUG_INFO: " << debugInformation() );
    398                         return -1;
    399                 }
    400                 ld->setRelaying();
     602                       
     603                ostringstream out;
     604                out << "No relay path found to link: " << ldr;
     605                throw message_not_sent(out.str());
     606                }
     607               
     608                next_hop_ld->setRelaying();
    401609                message->setRelayed(true);
    402         } else
    403                 ld = ldr;
    404 
    405         // handle direct link
    406         if (ld->communicationUp) {
    407                 logging_debug("send(): Sending message over direct link.");
    408                 return bc->sendMessage( ld->communicationId, message );
    409         } else {
    410                 logging_error("send(): Could not send message. "
    411                                 "Not a relayed link and direct link is not up.");
    412                 return -1;
    413         }
    414         return -1;
     610        }
     611        // BRANCH: direct link
     612        else
     613        {
     614                next_hop_ld = ldr;
     615        }
     616
     617       
     618        // check next hop-link
     619        if ( ! next_hop_ld->communicationUp)
     620        {
     621            throw message_not_sent( "send(): Could not send message."
     622                    " Not a relayed link and direct link is not up." );
     623        }
     624
     625        // send over next link
     626    logging_debug("send(): Sending message over direct link.");
     627    return send_overlaymessage_down(message, next_hop_ld->communicationId, priority);
     628
    415629}
    416630
    417631seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
    418                 const ServiceID& service) {
     632        uint8_t priority, const ServiceID& service) throw(message_not_sent)
     633{
    419634        message->setSourceNode(nodeId);
    420635        message->setDestinationNode(remote);
    421636        message->setService(service);
    422         return send( message, remote );
    423 }
    424 
    425 seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
     637        return send( message, remote, priority );
     638}
     639
     640void BaseOverlay::send_link( OverlayMsg* message,
     641        const LinkID& link,
     642        uint8_t priority ) throw(message_not_sent)
     643{
    426644        LinkDescriptor* ld = getDescriptor(link);
    427         if (ld==NULL) {
    428                 logging_error("Cannot find descriptor to link id=" << link.toString());
    429                 return -1;
    430         }
     645        if (ld==NULL)
     646        {
     647            throw message_not_sent("Cannot find descriptor to link id=" + link.toString());
     648        }
     649       
    431650        message->setSourceNode(nodeId);
    432651        message->setDestinationNode(ld->remoteNode);
     
    437656        message->setService(ld->service);
    438657        message->setRelayed(ld->relayed);
    439         return send( message, ld, ignore_down );
     658   
     659   
     660    try
     661    {
     662        // * send message *
     663        send( message, ld, priority );
     664    }
     665    catch ( message_not_sent& e )
     666    {
     667        // drop failed link
     668        ld->failed = true;
     669        dropLink(ld->overlayId);
     670    }
    440671}
    441672
     
    451682                // relay link still used and alive?
    452683                if (ld==NULL
    453                                 || !ld->isDirectVital()
    454                                 || difftime(route.used, time(NULL)) > 8) {
     684                                || !isLinkDirectVital(ld)
     685                                || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT)  // TODO this was set to 8 before.. Is the new timeout better?
     686                {
    455687                        logging_info("Forgetting relay information to node "
    456688                                        << route.node.toString() );
     
    488720        if (message->isRelayed()) {
    489721                // try to find source node
    490                 BOOST_FOREACH( relay_route& route, relay_routes ) {
     722                foreach( relay_route& route, relay_routes ) {
    491723                        // relay route found? yes->
    492724                        if ( route.node == message->getDestinationNode() ) {
     
    504736
    505737                // try to find source node
    506                 BOOST_FOREACH( relay_route& route, relay_routes ) {
     738                foreach( relay_route& route, relay_routes ) {
    507739
    508740                        // relay route found? yes->
     
    516748                                if (route.hops > message->getNumHops()
    517749                                                || rld == NULL
    518                                                 || !rld->isDirectVital()) {
     750                                                || !isLinkDirectVital(ld)) {
    519751                                        logging_info("Updating relay information to node "
    520752                                                        << route.node.toString()
    521                                                         << " reducing to " << message->getNumHops() << " hops.");
     753                                                        << " reducing to " << (int) message->getNumHops() << " hops.");
    522754                                        route.hops = message->getNumHops();
    523755                                        route.link = ld->overlayId;
     
    542774LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
    543775        // try to find source node
    544         BOOST_FOREACH( relay_route& route, relay_routes ) {
     776        foreach( relay_route& route, relay_routes ) {
    545777                if (route.node == remote ) {
    546778                        LinkDescriptor* ld = getDescriptor( route.link );
    547                         if (ld==NULL || !ld->isDirectVital()) return NULL; else {
     779                        if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else {
    548780                                route.used = time(NULL);
    549781                                return ld;
     
    575807// ----------------------------------------------------------------------------
    576808
    577 void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
     809void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) {
    578810        logging_info("Starting...");
    579811
    580812        // set parameters
    581         bc = &_basecomm;
     813        bc = _basecomm;
    582814        nodeId = _nodeid;
    583815
     
    587819
    588820        // timer for auto link management
    589         Timer::setInterval( 1000 );
     821        Timer::setInterval( 1000 ); // XXX
     822//      Timer::setInterval( 10000 );
    590823        Timer::start();
    591824
     
    641874                overlayInterface->joinOverlay();
    642875                state = BaseOverlayStateCompleted;
    643                 BOOST_FOREACH( NodeListener* i, nodeListeners )
     876                foreach( NodeListener* i, nodeListeners )
    644877                        i->onJoinCompleted( spovnetId );
    645878
     
    682915        // gather all service links
    683916        vector<LinkID> servicelinks;
    684         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     917        foreach( LinkDescriptor* ld, links )
     918        {
    685919                if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
    686920                        servicelinks.push_back( ld->overlayId );
     
    688922
    689923        // drop all service links
    690         BOOST_FOREACH( LinkID lnk, servicelinks )
    691         dropLink( lnk );
     924        foreach( LinkID lnk, servicelinks )
     925        {
     926            logging_debug("Dropping service link " << lnk.toString());
     927            dropLink( lnk );
     928        }
    692929
    693930        // let the node leave the spovnet overlay interface
    694931        logging_debug( "Leaving overlay" );
    695932        if( overlayInterface != NULL )
     933        {
    696934                overlayInterface->leaveOverlay();
     935        }
    697936
    698937        // drop still open bootstrap links
    699         BOOST_FOREACH( LinkID lnk, bootstrapLinks )
    700         bc->dropLink( lnk );
     938        foreach( LinkID lnk, bootstrapLinks )
     939        {
     940            logging_debug("Dropping bootstrap link " << lnk.toString());
     941            bc->dropLink( lnk );
     942        }
    701943
    702944        // change to inalid state
     
    708950
    709951        // inform all registered services of the event
    710         BOOST_FOREACH( NodeListener* i, nodeListeners ) {
     952        foreach( NodeListener* i, nodeListeners )
     953        {
    711954                if( ret ) i->onLeaveCompleted( spovnetId );
    712955                else i->onLeaveFailed( spovnetId );
     
    731974                state = BaseOverlayStateInvalid;
    732975
    733                 BOOST_FOREACH( NodeListener* i, nodeListeners )
     976                foreach( NodeListener* i, nodeListeners )
    734977                i->onJoinFailed( spovnetId );
    735978
     
    7831026                const ServiceID& service ) {
    7841027
     1028    // TODO What if we already have a Link to this node and this service id?
     1029   
    7851030        // do not establish a link to myself!
    786         if (remote == nodeId) return LinkID::UNSPECIFIED;
    787 
     1031        if (remote == nodeId) return
     1032                LinkID::UNSPECIFIED;
     1033
     1034       
    7881035        // create a link descriptor
    7891036        LinkDescriptor* ld = addDescriptor();
     
    7921039        ld->service = service;
    7931040        ld->listener = getListener(ld->service);
     1041   
     1042    // initialize sequence numbers
     1043    ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
     1044    logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum);
    7941045
    7951046        // create link request message
     
    8001051        msg.setRelayed(true);
    8011052        msg.setRegisterRelay(true);
     1053//      msg.setRouteRecord(true);
     1054   
     1055    msg.setSeqNum(ld->last_sent_seqnum);
    8021056
    8031057        // debug message
     
    8091063        );
    8101064
     1065       
    8111066        // sending message to node
    812         send_node( &msg, ld->remoteNode, ld->service );
    813 
     1067        try
     1068        {
     1069            // * send *
     1070            seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service );
     1071        }
     1072        catch ( message_not_sent& e )
     1073        {
     1074            logging_warn("Link request not sent: " << e.what());
     1075           
     1076            // Message not sent. Cancel link request.
     1077            SystemQueue::instance().scheduleCall(
     1078                    boost::bind(
     1079                            &BaseOverlay::__onLinkEstablishmentFailed,
     1080                            this,
     1081                            ld->overlayId)
     1082                );
     1083        }
     1084       
    8141085        return ld->overlayId;
    8151086}
    8161087
     1088/// NOTE: "id" is an Overlay-LinkID
     1089void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id)
     1090{
     1091    // TODO This code redundant. But also it's not easy to aggregate in one function.
     1092   
     1093    // get descriptor for link
     1094    LinkDescriptor* ld = getDescriptor(id, false);
     1095    if ( ld == NULL ) return; // not found? ->ignore!
     1096
     1097    logging_debug( "__onLinkEstablishmentFaild: " << ld );
     1098
     1099    // removing relay link information
     1100    removeRelayLink(ld->overlayId);
     1101
     1102    // inform listeners about link down
     1103    ld->communicationUp = false;
     1104    if (!ld->service.isUnspecified())
     1105    {
     1106        CommunicationListener* lst = getListener(ld->service);
     1107        if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode );
     1108        sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
     1109    }
     1110
     1111    // delete all queued messages (auto links)
     1112    if( ld->messageQueue.size() > 0 ) {
     1113        logging_warn( "Dropping link " << id.toString() << " that has "
     1114                << ld->messageQueue.size() << " waiting messages" );
     1115        ld->flushQueue();
     1116    }
     1117
     1118    // erase mapping
     1119    eraseDescriptor(ld->overlayId);
     1120}
     1121
     1122
    8171123/// drops an established link
    818 void BaseOverlay::dropLink(const LinkID& link) {
    819         logging_info( "Dropping link (initiated locally):" << link.toString() );
     1124void BaseOverlay::dropLink(const LinkID& link)
     1125{
     1126        logging_info( "Dropping link: " << link.toString() );
    8201127
    8211128        // find the link item to drop
    8221129        LinkDescriptor* ld = getDescriptor(link);
    823         if( ld == NULL ) {
     1130        if( ld == NULL )
     1131        {
    8241132                logging_warn( "Can't drop link, link is unknown!");
    8251133                return;
     
    8271135
    8281136        // delete all queued messages
    829         if( ld->messageQueue.size() > 0 ) {
     1137        if( ld->messageQueue.size() > 0 )
     1138        {
    8301139                logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
    8311140                                << ld->messageQueue.size() << " waiting messages" );
    8321141                ld->flushQueue();
    8331142        }
    834 
    835         // inform sideport and listener
    836         if(ld->listener != NULL)
    837                 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
    838         sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
    839 
    840         // do not drop relay links
    841         if (!ld->relaying) {
    842                 // drop the link in base communication
    843                 if (ld->communicationUp) bc->dropLink( ld->communicationId );
    844 
    845                 // erase descriptor
    846                 eraseDescriptor( ld->overlayId );
    847         } else {
    848                 ld->dropAfterRelaying = true;
    849         }
     1143       
     1144           
     1145        // inform application and remote note (but only once)
     1146        //   NOTE: If we initiated the drop, this function is called twice, but on
     1147        //   the second call, there is noting to do.
     1148        if ( ld->up && ! ld->failed )
     1149        {
     1150        // inform sideport and listener
     1151        if(ld->listener != NULL)
     1152        {
     1153            ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
     1154        }
     1155        sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
     1156       
     1157        // send link-close to remote node
     1158        logging_info("Sending LinkClose message to remote node.");
     1159        OverlayMsg close_msg(OverlayMsg::typeLinkClose);
     1160        send_link(&close_msg, link, system_priority::OVERLAY);
     1161   
     1162        // deactivate link
     1163        ld->up = false;
     1164//         ld->closing = true;
     1165        }
     1166       
     1167        else if ( ld->failed )
     1168    {
     1169        // inform listener
     1170        if( ld->listener != NULL )
     1171        {
     1172            ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
     1173        }
     1174       
     1175        ld->up = false;
     1176        __removeDroppedLink(ld->overlayId);
     1177    }
     1178}
     1179
     1180/// called from typeLinkClose-handler
     1181void BaseOverlay::__removeDroppedLink(const LinkID& link)
     1182{
     1183    // find the link item to drop
     1184    LinkDescriptor* ld = getDescriptor(link);
     1185    if( ld == NULL )
     1186    {
     1187        return;
     1188    }
     1189
     1190    // do not drop relay links
     1191    if (!ld->relaying)
     1192    {
     1193        // drop the link in base communication
     1194        if (ld->communicationUp)
     1195        {
     1196            bc->dropLink( ld->communicationId );
     1197        }
     1198
     1199        // erase descriptor
     1200        eraseDescriptor( ld->overlayId );
     1201    }
     1202    else
     1203    {
     1204        ld->dropAfterRelaying = true;
     1205    }
    8501206}
    8511207
     
    8531209
    8541210/// internal send message, always use this functions to send messages over links
    855 seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
     1211const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message,
     1212        const LinkID& link,
     1213        uint8_t priority ) throw(message_not_sent)
     1214{
    8561215        logging_debug( "Sending data message on link " << link.toString() );
    8571216
    8581217        // get the mapping for this link
    8591218        LinkDescriptor* ld = getDescriptor(link);
    860         if( ld == NULL ) {
    861                 logging_error("Could not send message. "
    862                                 << "Link not found id=" << link.toString());
    863                 return -1;
     1219        if( ld == NULL )
     1220        {
     1221            throw message_not_sent("Could not send message. Link not found id=" + link.toString());
    8641222        }
    8651223
    8661224        // check if the link is up yet, if its an auto link queue message
    867         if( !ld->up ) {
     1225        if( !ld->up )
     1226        {
    8681227                ld->setAutoUsed();
    869                 if( ld->autolink ) {
     1228                if( ld->autolink )
     1229                {
    8701230                        logging_info("Auto-link " << link.toString() << " not up, queue message");
    871                         Data data = data_serialize( message );
    872                         const_cast<Message*>(message)->dropPayload();
    873                         ld->messageQueue.push_back( new Message(data) );
    874                 } else {
    875                         logging_error("Link " << link.toString() << " not up, drop message");
    876                 }
    877                 return -1;
    878         }
    879 
    880         // compile overlay message (has service and node id)
    881         OverlayMsg overmsg( OverlayMsg::typeData );
    882         overmsg.encapsulate( const_cast<Message*>(message) );
    883 
    884         // send message over relay/direct/overlay
    885         return send_link( &overmsg, ld->overlayId );
    886 }
    887 
    888 
    889 seqnum_t BaseOverlay::sendMessage(const Message* message,
    890                 const NodeID& node, const ServiceID& service) {
     1231                       
     1232                        // queue message
     1233                LinkDescriptor::message_queue_entry msg;
     1234                msg.message = message;
     1235                msg.priority = priority;
     1236
     1237                        ld->messageQueue.push_back( msg );
     1238                       
     1239                        return SequenceNumber::DISABLED;  // TODO what to return if message is queued?
     1240                }
     1241                else
     1242                {
     1243                    throw message_not_sent("Link " + link.toString() + " not up, drop message");
     1244                }
     1245        }
     1246       
     1247        // TODO AKTUELL: sequence numbers
     1248        // TODO seqnum on fast path ?
     1249        ld->last_sent_seqnum.increment();
     1250       
     1251        /* choose fast-path for direct links; normal overlay-path otherwise */
     1252        // BRANCH: direct link
     1253        if ( ld->communicationUp && !ld->relayed )
     1254        {
     1255            // * send down to BaseCommunication *
     1256            try
     1257            {
     1258                bc->sendMessage(ld->communicationId, message, priority, true);
     1259        }
     1260        catch ( communication::communication_message_not_sent& e )
     1261        {
     1262            ostringstream out;
     1263            out << "Communication message on fast-path not sent: " << e.what();
     1264            throw message_not_sent(out.str());
     1265        }
     1266        }
     1267
     1268        // BRANCH: use (slow) overlay-path
     1269        else
     1270        {
     1271        // compile overlay message (has service and node id)
     1272        OverlayMsg overmsg( OverlayMsg::typeData );
     1273        overmsg.set_payload_message(message);
     1274       
     1275        // set SeqNum
     1276        if ( ld->transmit_seqnums )
     1277        {
     1278            overmsg.setSeqNum(ld->last_sent_seqnum);
     1279        }
     1280        logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum());
     1281   
     1282        // send message over relay/direct/overlay
     1283        send_link( &overmsg, ld->overlayId, priority );
     1284        }
     1285       
     1286        // return seqnum
     1287        return ld->last_sent_seqnum;
     1288}
     1289
     1290
     1291const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message,
     1292                const NodeID& node, uint8_t priority, const ServiceID& service) {
    8911293
    8921294        // find link for node and service
     
    9071309                if( ld == NULL ) {
    9081310                        logging_error( "Failed to establish auto-link.");
    909                         return -1;
     1311            throw message_not_sent("Failed to establish auto-link.");
    9101312                }
    9111313                ld->autolink = true;
     
    9201322
    9211323        // send / queue message
    922         return sendMessage( message, ld->overlayId );
    923 }
    924 
    925 
    926 NodeID BaseOverlay::sendMessageCloserToNodeID(const Message* message,
    927         const NodeID& address, const ServiceID& service) {
     1324        return sendMessage( message, ld->overlayId, priority );
     1325}
     1326
     1327
     1328NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message,
     1329        const NodeID& address, uint8_t priority, const ServiceID& service) {
    9281330   
    9291331    if ( overlayInterface->isClosestNodeTo(address) )
     
    9361338    if ( closest_node != NodeID::UNSPECIFIED )
    9371339    {
    938         seqnum_t seqnum = sendMessage(message, closest_node, service);
     1340        sendMessage(message, closest_node, priority, service);
    9391341    }
    9401342   
    941     return closest_node;  // XXX return seqnum ?? tuple? closest_node via (non const) reference?
     1343    return closest_node;  // return seqnum ?? tuple? closest_node via (non const) reference?
    9421344}
    9431345// ----------------------------------------------------------------------------
     
    9781380
    9791381        // see if we can find the node in our own table
    980         BOOST_FOREACH(const LinkDescriptor* ld, links){
     1382        foreach(const LinkDescriptor* ld, links){
    9811383                if(ld->remoteNode != node) continue;
    9821384                if(!ld->communicationUp) continue;
     
    10791481
    10801482void BaseOverlay::onLinkUp(const LinkID& id,
    1081                 const address_v* local, const address_v* remote) {
     1483        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
     1484{
    10821485        logging_debug( "Link up with base communication link id=" << id );
    10831486
     
    10851488        LinkDescriptor* ld = getDescriptor(id, true);
    10861489
    1087         // handle bootstrap link we initiated
     1490        // BRANCH: handle bootstrap link we initiated
    10881491        if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
    10891492                logging_info(
    10901493                                "Join has been initiated by me and the link is now up. " <<
     1494                                "LinkID: " << id.toString() <<
    10911495                                "Sending out join request for SpoVNet " << spovnetId.toString()
    10921496                );
     
    10961500                                OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
    10971501                JoinRequest joinRequest( spovnetId, nodeId );
    1098                 overlayMsg.encapsulate( &joinRequest );
    1099                 bc->sendMessage( id, &overlayMsg );
     1502                overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer());
     1503
     1504                send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
     1505               
    11001506                return;
    11011507        }
    11021508
    1103         // no link found? -> link establishment from remote, add one!
     1509        // BRANCH: link establishment from remote, add one!
    11041510        if (ld == NULL) {
    11051511                ld = addDescriptor( id );
     
    11151521                // in this case, do not inform listener, since service it unknown
    11161522                // -> wait for update message!
    1117 
    1118                 // link mapping found? -> send update message with node-id and service id
    1119         } else {
     1523        }
     1524       
     1525        // BRANCH: We requested this link in the first place
     1526        else
     1527        {
    11201528                logging_info( "onLinkUp descriptor (initiated locally):" << ld );
    11211529
     
    11261534                ld->fromRemote = false;
    11271535
    1128                 // if link is a relayed link->convert to direct link
    1129                 if (ld->relayed) {
    1130                         logging_info( "Converting to direct link: " << ld );
     1536                // BRANCH: this was a relayed link before --> convert to direct link
     1537                //   TODO do we really have to send a message here?
     1538                if (ld->relayed)
     1539                {
    11311540                        ld->up = true;
    11321541                        ld->relayed = false;
     1542                        logging_info( "Converting to direct link: " << ld );
     1543                       
     1544                        // send message
    11331545                        OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
    11341546                        overMsg.setSourceLink( ld->overlayId );
    11351547                        overMsg.setDestinationLink( ld->remoteLink );
    1136                         send_link( &overMsg, ld->overlayId );
    1137                 } else {
     1548                        send_link( &overMsg, ld->overlayId, system_priority::OVERLAY );
     1549                       
     1550                    // inform listener
     1551                    if( ld->listener != NULL)
     1552                        ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
     1553                }
     1554               
     1555
     1556        /* NOTE: Chord is opening direct-links in it's setup routine which are
     1557         *   neither set to "relayed" nor to "up". To activate these links a
     1558         *   typeLinkUpdate must be sent.
     1559         *   
     1560         * This branch is would also be taken when we had a working link before
     1561         *   (ld->up == true). I'm not sure if this case does actually happen
     1562         *   and whether it's tested.
     1563         */
     1564                else
     1565                {
    11381566                        // note: necessary to validate the link on the remote side!
    11391567                        logging_info( "Sending out update" <<
     
    11441572                        // compile and send update message
    11451573                        OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
    1146                         overlayMsg.setSourceLink(ld->overlayId);
    11471574                        overlayMsg.setAutoLink( ld->autolink );
    1148                         send_link( &overlayMsg, ld->overlayId, true );
     1575                    overlayMsg.setSourceNode(nodeId);
     1576                    overlayMsg.setDestinationNode(ld->remoteNode);
     1577                    overlayMsg.setSourceLink(ld->overlayId);
     1578                    overlayMsg.setDestinationLink(ld->remoteLink);
     1579                    overlayMsg.setService(ld->service);
     1580                    overlayMsg.setRelayed(false);
     1581
     1582                    // TODO ld->communicationId = id ??
     1583                   
     1584                    send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
    11491585                }
    11501586        }
     
    11521588
    11531589void BaseOverlay::onLinkDown(const LinkID& id,
    1154                 const address_v* local, const address_v* remote) {
    1155 
     1590        const addressing2::EndpointPtr local,
     1591        const addressing2::EndpointPtr remote)
     1592{
    11561593        // erase bootstrap links
    11571594        vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
     
    11851622}
    11861623
     1624
     1625void BaseOverlay::onLinkFail(const LinkID& id,
     1626        const addressing2::EndpointPtr local,
     1627        const addressing2::EndpointPtr remote)
     1628{
     1629        logging_debug( "Link fail with base communication link id=" << id );
     1630
     1631//      // erase bootstrap links
     1632//      vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
     1633//      if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
     1634//
     1635//      // get descriptor for link
     1636//      LinkDescriptor* ld = getDescriptor(id, true);
     1637//      if ( ld == NULL ) return; // not found? ->ignore!
     1638//      logging_debug( "Link failed id=" << ld->overlayId.toString() );
     1639//
     1640//      // inform listeners
     1641//      ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
     1642//      sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
     1643       
     1644        logging_debug( "  ... calling onLinkDown ..." );
     1645        onLinkDown(id, local, remote);
     1646}
     1647
     1648
    11871649void BaseOverlay::onLinkChanged(const LinkID& id,
    1188                 const address_v* oldlocal, const address_v* newlocal,
    1189                 const address_v* oldremote, const address_v* newremote) {
    1190 
    1191         // get descriptor for link
    1192         LinkDescriptor* ld = getDescriptor(id, true);
    1193         if ( ld == NULL ) return; // not found? ->ignore!
    1194         logging_debug( "onLinkChanged descriptor: " << ld );
    1195 
    1196         // inform listeners
    1197         ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
    1198         sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
    1199 
    1200         // autolinks: refresh timestamp
    1201         ld->setAutoUsed();
    1202 }
    1203 
    1204 void BaseOverlay::onLinkFail(const LinkID& id,
    1205                 const address_v* local, const address_v* remote) {
    1206         logging_debug( "Link fail with base communication link id=" << id );
    1207 
    1208         // erase bootstrap links
    1209         vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
    1210         if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
    1211 
    1212         // get descriptor for link
    1213         LinkDescriptor* ld = getDescriptor(id, true);
    1214         if ( ld == NULL ) return; // not found? ->ignore!
    1215         logging_debug( "Link failed id=" << ld->overlayId.toString() );
    1216 
    1217         // inform listeners
    1218         ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
    1219         sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
    1220 }
    1221 
    1222 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
    1223                 const address_v* remote, const QoSParameterSet& qos) {
    1224         logging_debug( "Link quality changed with base communication link id=" << id );
    1225 
    1226         // get descriptor for link
    1227         LinkDescriptor* ld = getDescriptor(id, true);
    1228         if ( ld == NULL ) return; // not found? ->ignore!
    1229         logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
    1230 }
    1231 
    1232 bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
    1233                 const address_v* remote ) {
     1650        const addressing2::EndpointPtr oldlocal,  const addressing2::EndpointPtr newlocal,
     1651        const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote)
     1652{
     1653    // get descriptor for link
     1654    LinkDescriptor* ld = getDescriptor(id, true);
     1655    if ( ld == NULL ) return; // not found? ->ignore!
     1656    logging_debug( "onLinkChanged descriptor: " << ld );
     1657
     1658    // inform listeners
     1659    ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
     1660    sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
     1661
     1662    // autolinks: refresh timestamp
     1663    ld->setAutoUsed();
     1664}
     1665
     1666//void BaseOverlay::onLinkQoSChanged(const LinkID& id,
     1667//        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
     1668//        const QoSParameterSet& qos)
     1669//{
     1670//      logging_debug( "Link quality changed with base communication link id=" << id );
     1671//
     1672//      // get descriptor for link
     1673//      LinkDescriptor* ld = getDescriptor(id, true);
     1674//      if ( ld == NULL ) return; // not found? ->ignore!
     1675//      logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
     1676//}
     1677
     1678bool BaseOverlay::onLinkRequest(const LinkID& id,
     1679        const addressing2::EndpointPtr local,
     1680        const addressing2::EndpointPtr remote)
     1681{
    12341682        logging_debug("Accepting link request from " << remote->to_string() );
     1683       
     1684        // TODO ask application..?
     1685       
    12351686        return true;
    12361687}
    12371688
     1689
     1690
     1691
    12381692/// handles a message from base communication
    1239 bool BaseOverlay::receiveMessage(const Message* message,
    1240                 const LinkID& link, const NodeID& ) {
     1693bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message,
     1694                const LinkID& link,
     1695                const NodeID&,
     1696                bool bypass_overlay )
     1697{
    12411698        // get descriptor for link
    12421699        LinkDescriptor* ld = getDescriptor( link, true );
    1243         return handleMessage( message, ld, link );
     1700
     1701       
     1702        /* choose fastpath for direct links; normal overlay-path otherwise */   
     1703        if ( bypass_overlay && ld )
     1704        {
     1705        // message received --> link is alive
     1706        ld->keepAliveReceived = time(NULL);
     1707        // hop count on this link
     1708        ld->hops = 0;
     1709
     1710       
     1711        // hand over to CommunicationListener (aka Application)
     1712            CommunicationListener* lst = getListener(ld->service);
     1713            if ( lst != NULL )
     1714            {
     1715                lst->onMessage(
     1716                        message,
     1717                        ld->remoteNode,
     1718                        ld->overlayId,
     1719                    SequenceNumber::DISABLED,
     1720                        NULL );
     1721               
     1722                return true;
     1723            }
     1724
     1725            return false;
     1726        }
     1727        else
     1728        {
     1729            return handleMessage( message, ld, link );     
     1730        }
    12441731}
    12451732
     
    12471734
    12481735/// Handle spovnet instance join requests
    1249 bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
    1250 
     1736bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink )
     1737{
    12511738        // decapsulate message
    1252         JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
     1739        JoinRequest joinReq;
     1740        joinReq.deserialize_from_shared_buffer(message);
     1741       
    12531742        logging_info( "Received join request for spovnet " <<
    1254                         joinReq->getSpoVNetID().toString() );
     1743                        joinReq.getSpoVNetID().toString() );
    12551744
    12561745        // check spovnet id
    1257         if( joinReq->getSpoVNetID() != spovnetId ) {
     1746        if( joinReq.getSpoVNetID() != spovnetId ) {
    12581747                logging_error(
    12591748                                "Received join request for spovnet we don't handle " <<
    1260                                 joinReq->getSpoVNetID().toString() );
    1261                 delete joinReq;
     1749                                joinReq.getSpoVNetID().toString() );
     1750
    12621751                return false;
    12631752        }
     
    12671756        logging_info( "Sending join reply for spovnet " <<
    12681757                        spovnetId.toString() << " to node " <<
    1269                         overlayMsg->getSourceNode().toString() <<
     1758                        source.toString() <<
    12701759                        ". Result: " << (allow ? "allowed" : "denied") );
    1271         joiningNodes.push_back( overlayMsg->getSourceNode() );
     1760        joiningNodes.push_back( source );
    12721761
    12731762        // return overlay parameters
     
    12761765                        << getEndpointDescriptor().toString() )
    12771766        OverlayParameterSet parameters = overlayInterface->getParameters();
     1767       
     1768       
     1769        // create JoinReplay Message
    12781770        OverlayMsg retmsg( OverlayMsg::typeJoinReply,
    12791771                        OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
    1280         JoinReply replyMsg( spovnetId, parameters,
    1281                         allow, getEndpointDescriptor() );
    1282         retmsg.encapsulate(&replyMsg);
    1283         bc->sendMessage( bcLink, &retmsg );
    1284 
    1285         delete joinReq;
     1772        JoinReply replyMsg( spovnetId, parameters, allow );
     1773        retmsg.append_buffer(replyMsg.serialize_into_shared_buffer());
     1774
     1775        // XXX This is unlovely clash between the old message system and the new one,
     1776        // but a.t.m. we can't migrate everything to the new system at once..
     1777        // ---> Consider the EndpointDescriptor as part of the JoinReply..
     1778        retmsg.append_buffer(getEndpointDescriptor().serialize());
     1779       
     1780        // * send *
     1781        send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY);
     1782
    12861783        return true;
    12871784}
    12881785
    12891786/// Handle replies to spovnet instance join requests
    1290 bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
     1787bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink )
     1788{
    12911789        // decapsulate message
    12921790        logging_debug("received join reply message");
    1293         JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
     1791        JoinReply replyMsg;
     1792        EndpointDescriptor endpoints;
     1793        reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message);
     1794        buff = endpoints.deserialize(buff);
    12941795
    12951796        // correct spovnet?
    1296         if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
     1797        if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail
    12971798                logging_error( "Received SpoVNet join reply for " <<
    1298                                 replyMsg->getSpoVNetID().toString() <<
     1799                                replyMsg.getSpoVNetID().toString() <<
    12991800                                " != " << spovnetId.toString() );
    1300                 delete replyMsg;
     1801
    13011802                return false;
    13021803        }
    13031804
    13041805        // access granted? no -> fail
    1305         if( !replyMsg->getJoinAllowed() ) {
     1806        if( !replyMsg.getJoinAllowed() ) {
    13061807                logging_error( "Our join request has been denied" );
    13071808
     
    13171818
    13181819                // inform all registered services of the event
    1319                 BOOST_FOREACH( NodeListener* i, nodeListeners )
     1820                foreach( NodeListener* i, nodeListeners )
    13201821                i->onJoinFailed( spovnetId );
    13211822
    1322                 delete replyMsg;
    13231823                return true;
    13241824        }
     
    13291829
    13301830        logging_debug( "Using bootstrap end-point "
    1331                         << replyMsg->getBootstrapEndpoint().toString() );
     1831                        << endpoints.toString() );
    13321832
    13331833        // create overlay structure from spovnet parameter set
     
    13381838
    13391839                overlayInterface = OverlayFactory::create(
    1340                                 *this, replyMsg->getParam(), nodeId, this );
     1840                                *this, replyMsg.getParam(), nodeId, this );
    13411841
    13421842                // overlay structure supported? no-> fail!
     
    13541854
    13551855                        // inform all registered services of the event
    1356                         BOOST_FOREACH( NodeListener* i, nodeListeners )
     1856                        foreach( NodeListener* i, nodeListeners )
    13571857                        i->onJoinFailed( spovnetId );
    13581858
    1359                         delete replyMsg;
    13601859                        return true;
    13611860                }
     
    13651864                overlayInterface->createOverlay();
    13661865
    1367                 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
    1368                 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
     1866                overlayInterface->joinOverlay( endpoints );
     1867                overlayBootstrap.recordJoin( endpoints );
    13691868
    13701869                // update ovlvis
     
    13721871
    13731872                // inform all registered services of the event
    1374                 BOOST_FOREACH( NodeListener* i, nodeListeners )
    1375                 i->onJoinCompleted( spovnetId );
    1376 
    1377                 delete replyMsg;
    1378 
    1379         } else {
    1380 
     1873                foreach( NodeListener* i, nodeListeners )
     1874                    i->onJoinCompleted( spovnetId );
     1875        }
     1876        else
     1877        {
    13811878                // this is not the first bootstrap, just join the additional node
    13821879                logging_debug("not first-time bootstrapping");
    1383                 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
    1384                 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
    1385 
    1386                 delete replyMsg;
    1387 
     1880                overlayInterface->joinOverlay( endpoints );
     1881                overlayBootstrap.recordJoin( endpoints );
    13881882        } // if( overlayInterface == NULL )
    13891883
     
    13921886
    13931887
    1394 bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
     1888bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld )
     1889{
    13951890        // get service
    1396         const ServiceID& service = overlayMsg->getService();
     1891        const ServiceID& service = ld->service; //overlayMsg->getService();
     1892
    13971893        logging_debug( "Received data for service " << service.toString()
    13981894                        << " on link " << overlayMsg->getDestinationLink().toString() );
     
    14021898        if(lst != NULL){
    14031899                lst->onMessage(
    1404                                 overlayMsg,
    1405                                 overlayMsg->getSourceNode(),
    1406                                 overlayMsg->getDestinationLink()
     1900                                message,
     1901//                              overlayMsg->getSourceNode(),
     1902//                              overlayMsg->getDestinationLink(),
     1903                                ld->remoteNode,
     1904                                ld->overlayId,
     1905                overlayMsg->getSeqNum(),
     1906                                overlayMsg
    14071907                );
    14081908        }
     
    14111911}
    14121912
     1913bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg )
     1914{
     1915    /**
     1916     * Deserialize MessageLost-Message
     1917     *
     1918     * - Type of lost message
     1919     * - Hop count of lost message
     1920     * - Source-LinkID  of lost message
     1921     */
     1922    const uint8_t* buff = message(0, sizeof(uint8_t)*2).data();
     1923    uint8_t type = buff[0];
     1924    uint8_t hops = buff[1];
     1925    LinkID linkid;
     1926    linkid.deserialize(message(sizeof(uint8_t)*2));
     1927   
     1928    logging_warn("Node " << msg->getSourceNode()
     1929            << " informed us, that our message of type " << (int) type
     1930            << " is lost after traveling " << (int) hops << " hops."
     1931            << " (LinkID: " << linkid.toString());
     1932
     1933   
     1934    // TODO switch-case ?
     1935   
     1936    // BRANCH: LinkRequest --> link request failed
     1937    if ( type == OverlayMsg::typeLinkRequest )
     1938    {
     1939        __onLinkEstablishmentFailed(linkid);
     1940    }
     1941   
     1942    // BRANCH: Data --> link disrupted. Drop link.
     1943    //   (We could use something more advanced here. e.g. At least send a
     1944    //    keep-alive message and wait for a keep-alive reply.)
     1945    if ( type == OverlayMsg::typeData )
     1946    {
     1947        LinkDescriptor* link_desc = getDescriptor(linkid);
     1948       
     1949        if ( link_desc )
     1950        {
     1951            link_desc->failed = true;
     1952        }
     1953       
     1954        dropLink(linkid);
     1955    }
     1956   
     1957    // BRANCH: ping lost
     1958    if ( type == OverlayMsg::typePing )
     1959    {
     1960        CommunicationListener* lst = getListener(msg->getService());
     1961        if( lst != NULL )
     1962        {
     1963            lst->onPingLost(msg->getSourceNode());
     1964        }
     1965    }
     1966   
     1967    return true;
     1968}
     1969
     1970bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld )
     1971{
     1972    // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node)
     1973   
     1974    bool send_pong = false;
     1975   
     1976    // inform application and ask permission to send a pong message
     1977    CommunicationListener* lst = getListener(overlayMsg->getService());
     1978    if( lst != NULL )
     1979    {
     1980        send_pong = lst->onPing(overlayMsg->getSourceNode());
     1981    }
     1982   
     1983    // send pong message if allowed
     1984    if ( send_pong )
     1985    {
     1986        OverlayMsg pong_msg(OverlayMsg::typePong);
     1987        pong_msg.setSeqNum(overlayMsg->getSeqNum());
     1988       
     1989        // send message
     1990        try
     1991        {
     1992            send_node( &pong_msg,
     1993                overlayMsg->getSourceNode(),
     1994                system_priority::OVERLAY,
     1995                overlayMsg->getService() );
     1996        }
     1997        catch ( message_not_sent& e )
     1998        {
     1999            logging_info("Could not send Pong-Message to node: " <<
     2000                overlayMsg->getSourceNode());
     2001        }
     2002    }
     2003}
     2004
     2005bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld )
     2006{
     2007    // inform application
     2008    CommunicationListener* lst = getListener(overlayMsg->getService());
     2009    if( lst != NULL )
     2010    {
     2011        lst->onPong(overlayMsg->getSourceNode());
     2012    }
     2013}
    14132014
    14142015bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
     
    14392040                overlayMsg->setSourceLink(ld->overlayId);
    14402041                overlayMsg->setService(ld->service);
    1441                 send( overlayMsg, ld );
     2042                send( overlayMsg, ld, system_priority::OVERLAY );
    14422043        }
    14432044
     
    14812082        if( ld->messageQueue.size() > 0 ) {
    14822083                logging_info( "Sending out queued messages on link " << ld );
    1483                 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
    1484                         sendMessage( msg, ld->overlayId );
    1485                         delete msg;
    1486                 }
     2084        foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue )
     2085        {
     2086            sendMessage( msg.message, ld->overlayId, msg.priority );
     2087        }
    14872088                ld->messageQueue.clear();
    14882089        }
     
    14972098/// handle a link request and reply
    14982099bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
    1499         logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
    15002100
    15012101        //TODO: Check if a request has already been sent using getSourceLink() ...
     
    15142114        ldn->remoteNode = overlayMsg->getSourceNode();
    15152115        ldn->remoteLink = overlayMsg->getSourceLink();
    1516 
     2116        ldn->hops = overlayMsg->getNumHops();
     2117       
     2118    // initialize sequence numbers
     2119    ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
     2120    logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum);
     2121   
     2122   
    15172123        // update time-stamps
    15182124        ldn->setAlive();
    15192125        ldn->setAutoUsed();
    15202126
     2127        logging_info( "Link request received from node id="
     2128                << overlayMsg->getSourceNode()
     2129                << " LINK: "
     2130                << ldn);
     2131       
    15212132        // create reply message and send back!
    15222133        overlayMsg->swapRoles(); // swap source/destination
    15232134        overlayMsg->setType(OverlayMsg::typeLinkReply);
    15242135        overlayMsg->setSourceLink(ldn->overlayId);
    1525         overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
    15262136        overlayMsg->setRelayed(true);
    1527         send( overlayMsg, ld ); // send back to link
     2137//      overlayMsg->setRouteRecord(true);
     2138    overlayMsg->setSeqNum(ld->last_sent_seqnum);
     2139       
     2140        // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!!
     2141        // append our endpoints (for creation of a direct link)
     2142        overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize());
     2143       
     2144        send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link
    15282145
    15292146        // inform listener
     
    15342151}
    15352152
    1536 bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
    1537 
     2153bool BaseOverlay::handleLinkReply(
     2154        OverlayMsg* overlayMsg,
     2155        reboost::shared_buffer_t sub_message,
     2156        LinkDescriptor* ld )
     2157{
     2158    // deserialize EndpointDescriptor
     2159    EndpointDescriptor endpoints;
     2160    endpoints.deserialize(sub_message);
     2161   
    15382162        // find link request
    15392163        LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
     
    15542178
    15552179        // debug message
    1556         logging_debug( "Link request reply received. Establishing link"
     2180        logging_info( "Link request reply received. Establishing link"
    15572181                        << " for service " << overlayMsg->getService().toString()
    15582182                        << " with local id=" << overlayMsg->getDestinationLink()
    15592183                        << " and remote link id=" << overlayMsg->getSourceLink()
    1560                         << " to " << overlayMsg->getSourceEndpoint().toString()
     2184                        << " to " << endpoints.toString()
     2185                        << " hop count: " << overlayMsg->getRouteRecord().size()
    15612186        );
    15622187
     
    15772202                logging_info( "Sending out queued messages on link " <<
    15782203                                ldn->overlayId.toString() );
    1579                 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
    1580                         sendMessage( msg, ldn->overlayId );
    1581                         delete msg;
     2204                foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue )
     2205                {
     2206                        sendMessage( msg.message, ldn->overlayId, msg.priority );
    15822207                }
    15832208                ldn->messageQueue.clear();
     
    15892214        // try to replace relay link with direct link
    15902215        ldn->retryCounter = 3;
    1591         ldn->endpoint = overlayMsg->getSourceEndpoint();
     2216        ldn->endpoint = endpoints;
    15922217        ldn->communicationId =  bc->establishLink( ldn->endpoint );
    15932218
     
    15962221
    15972222/// handle a keep-alive message for a link
    1598 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
     2223bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld )
     2224{
    15992225        LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
    1600         if ( rld != NULL ) {
    1601                 logging_debug("Keep-Alive for " <<
    1602                                 overlayMsg->getDestinationLink() );
     2226       
     2227        if ( rld != NULL )
     2228        {
     2229                logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() );
    16032230                if (overlayMsg->isRouteRecord())
     2231                {
    16042232                        rld->routeRecord = overlayMsg->getRouteRecord();
     2233                }
     2234               
     2235                // set alive
    16052236                rld->setAlive();
     2237               
     2238               
     2239                /* answer keep alive */
     2240                if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive )
     2241                {
     2242            time_t now = time(NULL);
     2243            logging_debug("[BaseOverlay] Answering KeepAlive over "
     2244                    << ld->to_string()
     2245                    << " after "
     2246                    << difftime( now, ld->keepAliveSent )
     2247                    << "s");
     2248           
     2249            OverlayMsg msg( OverlayMsg::typeKeepAliveReply,
     2250                    OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
     2251            msg.setRouteRecord(true);
     2252            ld->keepAliveSent = now;
     2253            send_link( &msg, ld->overlayId, system_priority::OVERLAY );
     2254                }
     2255
    16062256                return true;
    1607         } else {
    1608                 logging_error("Keep-Alive for "
     2257        }
     2258        else
     2259        {
     2260                logging_error("No Keep-Alive for "
    16092261                                << overlayMsg->getDestinationLink() << ": link unknown." );
    16102262                return false;
     
    16362288        // erase the original descriptor
    16372289        eraseDescriptor(ld->overlayId);
     2290       
     2291    // inform listener
     2292    if( rld->listener != NULL)
     2293        rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode );
     2294       
    16382295        return true;
    16392296}
    16402297
    16412298/// handles an incoming message
    1642 bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
    1643                 const LinkID bcLink ) {
    1644         logging_debug( "Handling message: " << message->toString());
    1645 
     2299bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld,
     2300                const LinkID bcLink )
     2301{
    16462302        // decapsulate overlay message
    1647         OverlayMsg* overlayMsg =
    1648                         const_cast<Message*>(message)->decapsulate<OverlayMsg>();
    1649         if( overlayMsg == NULL ) return false;
    1650 
     2303        OverlayMsg* overlayMsg = new OverlayMsg();
     2304        reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message);
     2305
     2306//      // XXX debug
     2307//      logging_info( "Received overlay message."
     2308//              << " Hops: " << (int) overlayMsg->getNumHops()
     2309//              << " Type: " << (int) overlayMsg->getType()
     2310//              << " Payload size: " << sub_buff.size()
     2311//             << " SeqNum: " << overlayMsg->getSeqNum() );
     2312       
     2313       
    16512314        // increase number of hops
    16522315        overlayMsg->increaseNumHops();
     
    16602323        // handle signaling messages (do not route!)
    16612324        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
    1662                         overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
    1663                 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
     2325                        overlayMsg->getType()<=OverlayMsg::typeSignalingEnd )
     2326        {
     2327                overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
    16642328                delete overlayMsg;
    16652329                return true;
     
    16732337                                << " to " << overlayMsg->getDestinationNode()
    16742338                );
    1675                 route( overlayMsg );
     2339               
     2340//              // XXX testing AKTUELL
     2341//        logging_info("MARIO: Routing message "
     2342//                << " from " << overlayMsg->getSourceNode()
     2343//                << " to " << overlayMsg->getDestinationNode() );
     2344//        logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size());
     2345                overlayMsg->append_buffer(sub_buff);
     2346               
     2347                route( overlayMsg, ld->remoteNode );
    16762348                delete overlayMsg;
    16772349                return true;
    16782350        }
    16792351
    1680         // handle base overlay message
     2352       
     2353        /* handle base overlay message */
    16812354        bool ret = false; // return value
    1682         switch ( overlayMsg->getType() ) {
    1683 
    1684         // data transport messages
    1685         case OverlayMsg::typeData:
    1686                 ret = handleData(overlayMsg, ld);                       break;
    1687 
    1688                 // overlay setup messages
    1689         case OverlayMsg::typeJoinRequest:
    1690                 ret = handleJoinRequest(overlayMsg, bcLink );   break;
    1691         case OverlayMsg::typeJoinReply:
    1692                 ret = handleJoinReply(overlayMsg, bcLink );     break;
    1693 
    1694                 // link specific messages
    1695         case OverlayMsg::typeLinkRequest:
    1696                 ret = handleLinkRequest(overlayMsg, ld );       break;
    1697         case OverlayMsg::typeLinkReply:
    1698                 ret = handleLinkReply(overlayMsg, ld );         break;
    1699         case OverlayMsg::typeLinkUpdate:
    1700                 ret = handleLinkUpdate(overlayMsg, ld );        break;
    1701         case OverlayMsg::typeLinkAlive:
    1702                 ret = handleLinkAlive(overlayMsg, ld );         break;
    1703         case OverlayMsg::typeLinkDirect:
    1704                 ret = handleLinkDirect(overlayMsg, ld );        break;
    1705 
    1706                 // handle unknown message type
    1707         default: {
    1708                 logging_error( "received message in invalid state! don't know " <<
    1709                                 "what to do with this message of type " << overlayMsg->getType() );
    1710                 ret = false;
    1711                 break;
    1712         }
     2355        try
     2356        {
     2357        switch ( overlayMsg->getType() )
     2358        {
     2359            // data transport messages
     2360            case OverlayMsg::typeData:
     2361            {
     2362                // NOTE: On relayed links, »ld« does not point to our link, but on the relay link.
     2363                LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink());
     2364               
     2365                if ( ! end_to_end_ld )
     2366                {
     2367                    logging_warn("Error: Data-Message claims to belong to a link we don't know.");
     2368                   
     2369                    ret = false;
     2370                }
     2371                else
     2372                {
     2373                    // message received --> link is alive
     2374                    end_to_end_ld->keepAliveReceived = time(NULL);
     2375                    // hop count on this link
     2376                    end_to_end_ld->hops = overlayMsg->getNumHops();
     2377                   
     2378                    // * call handler *
     2379                    ret = handleData(sub_buff, overlayMsg, end_to_end_ld);
     2380                }
     2381               
     2382                break;
     2383            }
     2384            case OverlayMsg::typeMessageLost:
     2385                ret = handleLostMessage(sub_buff, overlayMsg);
     2386               
     2387                break;
     2388       
     2389                // overlay setup messages
     2390            case OverlayMsg::typeJoinRequest:
     2391                ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink );        break;
     2392            case OverlayMsg::typeJoinReply:
     2393                ret = handleJoinReply(sub_buff, bcLink );       break;
     2394       
     2395                // link specific messages
     2396            case OverlayMsg::typeLinkRequest:
     2397                ret = handleLinkRequest(overlayMsg, ld );       break;
     2398            case OverlayMsg::typeLinkReply:
     2399                ret = handleLinkReply(overlayMsg, sub_buff, ld );       break;
     2400            case OverlayMsg::typeLinkUpdate:
     2401                ret = handleLinkUpdate(overlayMsg, ld );        break;
     2402            case OverlayMsg::typeKeepAlive:
     2403            case OverlayMsg::typeKeepAliveReply:
     2404                ret = handleLinkAlive(overlayMsg, ld );         break;
     2405            case OverlayMsg::typeLinkDirect:
     2406                ret = handleLinkDirect(overlayMsg, ld );        break;
     2407               
     2408            case OverlayMsg::typeLinkClose:
     2409            {
     2410                dropLink(overlayMsg->getDestinationLink());
     2411                __removeDroppedLink(overlayMsg->getDestinationLink());
     2412               
     2413                break;
     2414            }
     2415           
     2416            /// ping over overlay path (or similar)
     2417            case OverlayMsg::typePing:
     2418            {
     2419                ret = handlePing(overlayMsg, ld);
     2420                break;
     2421            }
     2422            case OverlayMsg::typePong:
     2423            {
     2424                ret = handlePong(overlayMsg, ld);
     2425                break;
     2426            }
     2427           
     2428                // handle unknown message type
     2429            default:
     2430            {
     2431                logging_error( "received message in invalid state! don't know " <<
     2432                        "what to do with this message of type " << overlayMsg->getType() );
     2433                ret = false;
     2434                break;
     2435            }
     2436        }
     2437        }
     2438        catch ( reboost::illegal_sub_buffer& e )
     2439        {
     2440            logging_error( "Failed to create sub-buffer while reading message: »"
     2441                    << e.what()
     2442                    << "« Message too short? ");
     2443           
     2444            assert(false); // XXX
    17132445        }
    17142446
     
    17202452// ----------------------------------------------------------------------------
    17212453
    1722 void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
     2454void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) {
    17232455
    17242456        logging_debug( "broadcasting message to all known nodes " <<
    17252457                        "in the overlay from service " + service.toString() );
    1726 
    1727         if(message == NULL) return;
    1728         message->setReleasePayload(false);
    17292458
    17302459        OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
     
    17322461                NodeID& id = nodes.at(i);
    17332462                if(id == this->nodeId) continue; // don't send to ourselfs
    1734                 if(i+1 == nodes.size()) message->setReleasePayload(true); // release payload on last send
    1735                 sendMessage( message, id, service );
     2463
     2464                sendMessage( message, id, priority, service );
    17362465        }
    17372466}
     
    17552484vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
    17562485        vector<LinkID> linkvector;
    1757         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     2486        foreach( LinkDescriptor* ld, links ) {
    17582487                if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
    17592488                        linkvector.push_back( ld->overlayId );
     
    17792508        updateVisual();
    17802509}
     2510
     2511
     2512
     2513/* link status */
     2514bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const
     2515{
     2516    const LinkDescriptor* ld = getDescriptor(lnk);
     2517   
     2518    if (!ld)
     2519        return false;
     2520   
     2521    return ld->communicationUp && !ld->relayed;
     2522}
     2523
     2524int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const
     2525{
     2526    const LinkDescriptor* ld = getDescriptor(lnk);
     2527   
     2528    if (!ld)
     2529        return -1;
     2530   
     2531    return ld->hops;   
     2532}
     2533
     2534
     2535bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const
     2536{
     2537    time_t now = time(NULL);
     2538
     2539    return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..?
     2540}
     2541
     2542bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const
     2543{
     2544    return isLinkVital(link) && link->communicationUp && !link->relayed;
     2545}
     2546
     2547/* [link status] */
     2548
    17812549
    17822550void BaseOverlay::updateVisual(){
     
    18782646        static set<NodeID> linkset;
    18792647        set<NodeID> remotenodes;
    1880         BOOST_FOREACH( LinkDescriptor* ld, links ) {
    1881                 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
     2648        foreach( LinkDescriptor* ld, links ) {
     2649                if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
    18822650                        continue;
    18832651
     
    18952663        do{
    18962664                changed = false;
    1897                 BOOST_FOREACH(NodeID n, linkset){
     2665                foreach(NodeID n, linkset){
    18982666                        if(remotenodes.find(n) == remotenodes.end()){
    18992667                                visualInstance.visDisconnect(visualIdBase, this->nodeId, n, "");
     
    19082676        do{
    19092677                changed = false;
    1910                 BOOST_FOREACH(NodeID n, remotenodes){
     2678                foreach(NodeID n, remotenodes){
    19112679                        if(linkset.find(n) == linkset.end()){
    19122680                                visualInstance.visConnect(visualIdBase, this->nodeId, n, "");
     
    19332701        // dump link state
    19342702        s << "--- link state -------------------------------" << endl;
    1935         BOOST_FOREACH( LinkDescriptor* ld, links ) {
     2703        foreach( LinkDescriptor* ld, links ) {
    19362704                s << "link " << i << ": " << ld << endl;
    19372705                i++;
  • source/ariba/overlay/BaseOverlay.h

    r10653 r12060  
    4747#include <vector>
    4848#include <deque>
     49#include <stdexcept>
    4950#include <boost/foreach.hpp>
     51
     52#ifdef ECLIPSE_PARSER
     53    #define foreach(a, b) for(a : b)
     54#else
     55    #define foreach(a, b) BOOST_FOREACH(a, b)
     56#endif
    5057
    5158#include "ariba/utility/messages.h"
     
    6471#include "ariba/overlay/modules/OverlayStructureEvents.h"
    6572#include "ariba/overlay/OverlayBootstrap.h"
     73#include "ariba/overlay/SequenceNumber.h"
    6674
    6775// forward declarations
     
    92100using ariba::communication::BaseCommunication;
    93101using ariba::communication::CommunicationEvents;
     102
     103// transport
     104//using ariba::transport::system_priority;
    94105
    95106// utilities
     
    103114using ariba::utility::Demultiplexer;
    104115using ariba::utility::MessageReceiver;
    105 using ariba::utility::MessageSender;
    106116using ariba::utility::seqnum_t;
    107117using ariba::utility::Timer;
     
    110120namespace overlay {
    111121
    112 using namespace ariba::addressing;
     122
     123
     124class message_not_sent: public std::runtime_error
     125{
     126public:
     127    /** Takes a character string describing the error.  */
     128    explicit message_not_sent(const string& __arg)  :
     129        std::runtime_error(__arg)
     130    {
     131    }
     132   
     133    virtual ~message_not_sent() throw() {}
     134};
     135
     136
    113137
    114138class LinkDescriptor;
     
    121145                protected Timer {
    122146
    123         friend class OneHop;
     147//      friend class OneHop;  // DEPRECATED
    124148        friend class Chord;
    125149        friend class ariba::SideportListener;
     
    128152
    129153public:
    130 
    131154        /**
    132155         * Constructs an empty non-functional base overlay instance
     
    142165         * Starts the Base Overlay instance
    143166         */
    144         void start(BaseCommunication& _basecomm, const NodeID& _nodeid);
     167        void start(BaseCommunication* _basecomm, const NodeID& _nodeid);
    145168
    146169        /**
     
    161184         * Starts a link establishment procedure to the specfied node
    162185         * for the service with id service
    163          *
     186         * 
    164187         * @param node Destination node id
    165188         * @param service Service to connect to
     
    179202        void dropLink( const LinkID& link );
    180203
     204       
     205       
     206        /* +++++ Message sending +++++ */
     207       
     208       
    181209        /// sends a message over an existing link
    182         seqnum_t sendMessage(const Message* message, const LinkID& link );
     210        const SequenceNumber& sendMessage(reboost::message_t message,
     211                const LinkID& link,
     212                uint8_t priority ) throw(message_not_sent);
    183213
    184214        /// sends a message to a node and a specific service
    185         seqnum_t sendMessage(const Message* message, const NodeID& remote,
    186                 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID);
     215    const SequenceNumber& sendMessage(reboost::message_t message,
     216                const NodeID& remote,
     217                uint8_t priority,
     218                const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID);
     219
    187220
    188221    /**
     
    191224     *  @return NodeID of the (closest) destination node;
    192225     */
    193         NodeID sendMessageCloserToNodeID(const Message* message, const NodeID& address,
    194                 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID);
     226        NodeID sendMessageCloserToNodeID(reboost::message_t message, const NodeID& address,
     227                uint8_t priority, const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID);
    195228       
    196229        /**
     
    198231         * Depending on the structure of the overlay, this can be very different.
    199232         */
    200         void broadcastMessage(Message* message, const ServiceID& service);
    201 
     233        void broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority);
     234
     235       
     236        /* +++++ [Message sending] +++++ */
     237       
     238       
     239       
    202240        /**
    203241         * Returns the end-point descriptor of a link.
     
    294332         */
    295333        void leaveSpoVNet();
     334       
     335       
     336        /* link status */
     337        bool isLinkDirect(const ariba::LinkID& lnk) const;
     338        int getHopCount(const ariba::LinkID& lnk) const;
     339
     340    bool isLinkVital(const LinkDescriptor* link) const;
     341    bool isLinkDirectVital(const LinkDescriptor* link) const;
    296342
    297343protected:
    298         /**
    299          * @see ariba::communication::CommunicationEvents.h
    300          */
    301         virtual void onLinkUp(const LinkID& id, const address_v* local,
    302                 const address_v* remote);
    303 
    304         /**
    305          * @see ariba::communication::CommunicationEvents.h
    306          */
    307         virtual void onLinkDown(const LinkID& id, const address_v* local,
    308                 const address_v* remote);
    309 
    310         /**
    311          * @see ariba::communication::CommunicationEvents.h
    312          */
    313         virtual void onLinkChanged(const LinkID& id,
    314                 const address_v* oldlocal, const address_v* newlocal,
    315                 const address_v* oldremote, const address_v* newremote);
    316 
    317         /**
    318          * @see ariba::communication::CommunicationEvents.h
    319          */
    320         virtual void onLinkFail(const LinkID& id, const address_v* local,
    321                 const address_v* remote);
    322 
    323         /**
    324          * @see ariba::communication::CommunicationEvents.h
    325          */
    326         virtual void onLinkQoSChanged(const LinkID& id,
    327                 const address_v* local, const address_v* remote,
    328                 const QoSParameterSet& qos);
    329 
    330         /**
    331          * @see ariba::communication::CommunicationEvents.h
    332          */
    333         virtual bool onLinkRequest(const LinkID& id, const address_v* local,
    334                 const address_v* remote);
    335 
     344
     345    /**
     346     * @see ariba::communication::CommunicationEvents.h
     347     */
     348    virtual bool onLinkRequest(const LinkID& id,
     349            const addressing2::EndpointPtr local,
     350            const addressing2::EndpointPtr remote);
     351
     352    /**
     353     * @see ariba::communication::CommunicationEvents.h
     354     */
     355    virtual void onLinkUp(const LinkID& id,
     356            const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote);
     357
     358    /**
     359     * @see ariba::communication::CommunicationEvents.h
     360     */
     361    virtual void onLinkDown(const LinkID& id,
     362            const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote);
     363
     364    /**
     365     * @see ariba::communication::CommunicationEvents.h
     366     */
     367    virtual void onLinkChanged(const LinkID& id,
     368        const addressing2::EndpointPtr oldlocal,  const addressing2::EndpointPtr newlocal,
     369        const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote);
     370
     371    /**
     372     * @see ariba::communication::CommunicationEvents.h
     373     *
     374     * NOTE: Just calls onLinkDown (at the moment..)
     375     */
     376    virtual void onLinkFail(const LinkID& id,
     377            const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote);
     378
     379    /**
     380     * @see ariba::communication::CommunicationEvents.h
     381     */
     382//    virtual void onLinkQoSChanged(const LinkID& id,
     383//            const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
     384//            const QoSParameterSet& qos);
     385
     386       
     387       
     388       
    336389        /**
    337390         * Processes a received message from BaseCommunication
     
    340393         * the node the message came from!
    341394         */
    342         virtual bool receiveMessage( const Message* message, const LinkID& link,
    343                 const NodeID& );
     395        virtual bool receiveMessage( reboost::shared_buffer_t message,
     396                const LinkID& link,
     397                const NodeID&,
     398                bool bypass_overlay );
    344399
    345400        /**
     
    359414        std::string getLinkHTMLInfo();
    360415
     416
     417private:
     418        /// NOTE: "id" is an Overlay-LinkID
     419        void __onLinkEstablishmentFailed(const LinkID& id);
     420       
     421        /// called from typeLinkClose-handler
     422    void __removeDroppedLink(const LinkID& link);
     423       
    361424private:
    362425        /// is the base overlay started yet
     
    396459
    397460        /// demultiplexes a incoming message with link descriptor
    398         bool handleMessage( const Message* message, LinkDescriptor* ld,
     461        bool handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld,
    399462                const LinkID bcLink = LinkID::UNSPECIFIED );
    400463
    401464        // handle data and signalling messages
    402         bool handleData( OverlayMsg* msg, LinkDescriptor* ld );
     465        bool handleData( reboost::shared_buffer_t message, OverlayMsg* msg, LinkDescriptor* ld );
     466        bool handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg );
    403467        bool handleSignaling( OverlayMsg* msg, LinkDescriptor* ld );
    404468
    405469        // handle join request / reply messages
    406         bool handleJoinRequest( OverlayMsg* msg, const LinkID& bcLink );
    407         bool handleJoinReply( OverlayMsg* msg, const LinkID& bcLink );
     470        bool handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink );
     471        bool handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink );
    408472
    409473        // handle link messages
    410474        bool handleLinkRequest( OverlayMsg* msg, LinkDescriptor* ld );
    411         bool handleLinkReply( OverlayMsg* msg, LinkDescriptor* ld );
     475        bool handleLinkReply( OverlayMsg* msg, reboost::shared_buffer_t sub_message, LinkDescriptor* ld );
    412476        bool handleLinkUpdate( OverlayMsg* msg, LinkDescriptor* ld );
    413477        bool handleLinkDirect( OverlayMsg* msg, LinkDescriptor* ld );
    414478        bool handleLinkAlive( OverlayMsg* msg, LinkDescriptor* ld );
     479   
     480    // ping-pong over overlaypath/routing
     481    bool handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld );
     482    bool handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld );
    415483
    416484
     
    478546        // internal message delivery -----------------------------------------------
    479547
     548    // Convert OverlayMessage into new format and give it down to BaseCommunication
     549    seqnum_t send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority );
     550
     551   
    480552        /// routes a message to its destination node
    481         void route( OverlayMsg* message );
    482 
     553        void route( OverlayMsg* message, const NodeID& last_hop = NodeID::UNSPECIFIED );
     554       
    483555        /// sends a raw message to another node, delivers it to the base overlay class
    484         seqnum_t send( OverlayMsg* message, const NodeID& destination );
     556        /// may throw "message_not_sent"-exception
     557        seqnum_t send( OverlayMsg* message,
     558                   const NodeID& destination,
     559                   uint8_t priority,
     560                const NodeID& last_hop = NodeID::UNSPECIFIED )
     561    throw(message_not_sent);
    485562
    486563        /// send a raw message using a link descriptor, delivers it to the base overlay class
    487         seqnum_t send( OverlayMsg* message, LinkDescriptor* ld,
    488                 bool ignore_down = false );
     564        seqnum_t send( OverlayMsg* message,
     565                LinkDescriptor* ld,
     566                uint8_t priority ) throw(message_not_sent);
    489567
    490568        /// send a message using a node id using overlay routing
    491569        /// sets necessary fields in the overlay message!
    492         seqnum_t send_node( OverlayMsg* message, const NodeID& remote,
    493                 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID);
     570        /// may throw "message_not_sent"-exception
     571        seqnum_t send_node( OverlayMsg* message, const NodeID& remote, uint8_t priority,
     572                const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID) throw(message_not_sent);
    494573
    495574        /// send a message using a node id using overlay routing using a link
    496575        /// sets necessary fields in the overlay message!
    497         seqnum_t send_link( OverlayMsg* message, const LinkID& link,
    498                 bool ignore_down = false );
    499 
     576        void send_link( OverlayMsg* message,
     577                const LinkID& link,
     578                uint8_t priority ) throw(message_not_sent);
     579
     580       
     581        /// sends a notification to a sender from whom we just dropped a message
     582        void report_lost_message( const OverlayMsg* message );
     583       
    500584        // misc --------------------------------------------------------------------
    501585
  • source/ariba/overlay/CMakeLists.txt

    r10700 r12060  
    4141    LinkDescriptor.h
    4242    OverlayBootstrap.h
     43    SequenceNumber.h
    4344    )
    4445
     
    4748    LinkDescriptor.cpp
    4849    OverlayBootstrap.cpp
     50    SequenceNumber.cpp
    4951    )
    5052
  • source/ariba/overlay/LinkDescriptor.h

    r6961 r12060  
    1212#include "ariba/communication/EndpointDescriptor.h"
    1313#include "ariba/CommunicationListener.h"
     14
     15// reboost messages
     16#include "ariba/utility/transport/messages/message.hpp"
     17#include <ariba/utility/misc/sha1.h>
     18
     19#include "ariba/overlay/SequenceNumber.h"
     20
    1421
    1522namespace ariba {
     
    3744class LinkDescriptor {
    3845public:
     46    struct message_queue_entry
     47    {
     48        reboost::message_t message;
     49        uint8_t priority;
     50    };
     51   
    3952        // ctor
    4053        LinkDescriptor() {
     54            time_t now = time(NULL);
     55           
    4156                // default values
    4257                this->up = false;
     58//         this->closing = false;
     59        this->failed = false;
    4360                this->fromRemote = false;
    4461                this->remoteNode = NodeID::UNSPECIFIED;
     
    4663                this->communicationUp = false;
    4764                this->communicationId = LinkID::UNSPECIFIED;
    48                 this->keepAliveTime = time(NULL);
    49                 this->keepAliveMissed = 0;
     65                this->keepAliveReceived = now;
     66                this->keepAliveSent = now;
    5067                this->relaying     = false;
    51                 this->timeRelaying = time(NULL);
     68                this->timeRelaying = now;
    5269                this->dropAfterRelaying = false;
    5370                this->service  = ServiceID::UNSPECIFIED;
     
    5673                this->remoteLink = LinkID::UNSPECIFIED;
    5774                this->autolink = false;
    58                 this->lastuse = time(NULL);
     75                this->lastuse = now;
    5976                this->retryCounter = 0;
     77                this->hops = -1;
     78       
     79        this->transmit_seqnums = false; // XXX
    6080        }
    6181
     
    6787        // general information about the link --------------------------------------
    6888        bool up;           ///< flag whether this link is up and running
     89//      bool closing;      ///< flag, whether this link is in the regular process of closing
     90        bool failed;       ///< flag, whether communication is (assumed to be) not/no longer possible on this link
    6991        bool fromRemote;   ///< flag, whether this link was requested from remote
    7092        NodeID remoteNode; ///< remote end-point node
    71         bool isVital() {
    72                 return up && keepAliveMissed == 0;
    73         }
    74         bool isDirectVital() {
    75                 return isVital() && communicationUp && !relayed;
    76         }
    7793
    7894
     
    8298        bool   communicationUp;   ///< flag, whether the communication is up
    8399
     100        // sequence numbers --------------------------------------------------------
     101        SequenceNumber last_sent_seqnum;
     102    bool transmit_seqnums;
     103   
    84104        // direct link retries -----------------------------------------------------
    85105        EndpointDescriptor endpoint;
     
    87107
    88108        // link alive information --------------------------------------------------
    89         time_t keepAliveTime; ///< the last time a keep-alive message was received
    90         int keepAliveMissed;  ///< the number of missed keep-alive messages
     109        time_t keepAliveReceived; ///< the last time a keep-alive message was received
     110        time_t keepAliveSent;  ///< the number of missed keep-alive messages
    91111        void setAlive() {
    92                 keepAliveMissed = 0;
    93                 keepAliveTime = time(NULL);
     112//              keepAliveSent = time(NULL);
     113                keepAliveReceived = time(NULL);
    94114        }
    95115
     
    98118        LinkID remoteLink; ///< the remote link id
    99119        vector<NodeID> routeRecord;
     120        int hops;
    100121
    101122        // relay state -------------------------------------------------------------
     
    114135        // auto links --------------------------------------------------------------
    115136        bool autolink;  ///< flag, whether this link is a auto-link
    116         time_t lastuse; ///< time, when the link was last used
    117         deque<Message*> messageQueue; ///< waiting messages to be delivered
     137        time_t lastuse; ///< time, when the link was last used XXX AUTO_LINK-ONLY
     138        deque<message_queue_entry> messageQueue; ///< waiting messages to be delivered
    118139        void setAutoUsed() {
    119140                if (autolink) lastuse = time(NULL);
     
    121142        /// drops waiting auto-link messages
    122143        void flushQueue() {
    123                 BOOST_FOREACH( Message* msg, messageQueue )     delete msg;
     144//              BOOST_FOREACH( Message* msg, messageQueue )     delete msg;  // XXX MARIO: shouldn't be necessary anymore, since we're using shared pointers
    124145                messageQueue.clear();
    125146        }
     
    127148        // string representation ---------------------------------------------------
    128149        std::string to_string() const {
     150            time_t now = time(NULL);
     151           
    129152                std::ostringstream s;
     153        if ( relayed )
     154            s << "[RELAYED-";
     155        else
     156            s << "[DIRECT-";
     157        s << "LINK] ";
     158        s << "id=" << overlayId.toString().substr(0,4) << " ";
     159        s << "serv=" << service.toString() << " ";
    130160                s << "up=" << up << " ";
    131161                s << "init=" << !fromRemote << " ";
    132                 s << "id=" << overlayId.toString().substr(0,4) << " ";
    133                 s << "serv=" << service.toString() << " ";
    134162                s << "node=" << remoteNode.toString().substr(0,4) << " ";
    135163                s << "relaying=" << relaying << " ";
    136                 s << "miss=" << keepAliveMissed << " ";
     164                s << "last_received=" << now - keepAliveReceived << "s ";
    137165                s << "auto=" << autolink << " ";
     166                s << "hops=" << hops << " ";
    138167                if ( relayed ) {
    139168                        s << "| Relayed: ";
     
    146175                } else {
    147176                        s << "| Direct: ";
    148                         s << "using id=" << communicationId.toString().substr(0,4) << " ";
     177                        s << "using [COMMUNICATION-LINK] id=" << communicationId.toString().substr(0,4) << " ";
    149178                        s << "(up=" << communicationUp << ") ";
    150179                }
  • source/ariba/overlay/messages/JoinReply.cpp

    r3690 r12060  
    4444vsznDefault(JoinReply);
    4545
    46 JoinReply::JoinReply(const SpoVNetID _spovnetid, const OverlayParameterSet _param, bool _joinAllowed, const EndpointDescriptor _bootstrapEp)
    47         : spovnetid( _spovnetid ), param( _param ), joinAllowed( _joinAllowed ), bootstrapEp( _bootstrapEp ){
     46JoinReply::JoinReply(const SpoVNetID _spovnetid, const OverlayParameterSet _param, bool _joinAllowed)
     47        : spovnetid( _spovnetid ), param( _param ), joinAllowed( _joinAllowed )
     48{
    4849}
    4950
     
    6465}
    6566
    66 const EndpointDescriptor& JoinReply::getBootstrapEndpoint(){
    67         return bootstrapEp;
    68 }
     67//const EndpointDescriptor& JoinReply::getBootstrapEndpoint(){
     68//      return bootstrapEp;
     69//}
    6970
    7071}} // ariba::overlay
  • source/ariba/overlay/messages/JoinReply.h

    r5870 r12060  
    4040#define JOIN_REPLY_H__
    4141
    42 #include "ariba/utility/messages.h"
     42//#include "ariba/utility/messages.h"
     43#include "ariba/utility/messages/Message.h"
    4344#include "ariba/utility/serialization.h"
    4445#include "ariba/utility/types/SpoVNetID.h"
    4546#include "ariba/utility/types/NodeID.h"
    4647#include "ariba/utility/types/OverlayParameterSet.h"
    47 #include "ariba/communication/EndpointDescriptor.h"
     48//#include "ariba/communication/EndpointDescriptor.h"
    4849
    4950using ariba::utility::OverlayParameterSet;
     
    5152using ariba::utility::SpoVNetID;
    5253using ariba::utility::NodeID;
    53 using ariba::communication::EndpointDescriptor;
     54//using ariba::communication::EndpointDescriptor;
    5455
    5556namespace ariba {
     
    6465        OverlayParameterSet param; //< overlay parameters
    6566        bool joinAllowed; //< join successfull or access denied
    66         EndpointDescriptor bootstrapEp; //< the endpoint for bootstrapping the overlay interface
     67//      EndpointDescriptor bootstrapEp; //< the endpoint for bootstrapping the overlay interface
    6768
    6869public:
     
    7071                const SpoVNetID _spovnetid = SpoVNetID::UNSPECIFIED,
    7172                const OverlayParameterSet _param = OverlayParameterSet::DEFAULT,
    72                 bool _joinAllowed = false,
    73                 const EndpointDescriptor _bootstrapEp = EndpointDescriptor::UNSPECIFIED()
     73                bool _joinAllowed = false  /*,
     74                const EndpointDescriptor _bootstrapEp = EndpointDescriptor::UNSPECIFIED()*/
    7475        );
    7576
     
    7980        const OverlayParameterSet& getParam();
    8081        bool getJoinAllowed();
    81         const EndpointDescriptor& getBootstrapEndpoint();
     82//      const EndpointDescriptor& getBootstrapEndpoint();
    8283};
    8384
     
    8687sznBeginDefault( ariba::overlay::JoinReply, X ) {
    8788        uint8_t ja = joinAllowed;
    88         X && &spovnetid && param && bootstrapEp && ja;
     89        X && &spovnetid && param;
     90//      X && bootstrapEp;
     91        X && ja;
    8992        if (X.isDeserializer()) joinAllowed = ja;
    9093} sznEnd();
  • source/ariba/overlay/messages/OverlayMsg.h

    r10653 r12060  
    4747#include "ariba/utility/types/NodeID.h"
    4848#include "ariba/utility/types/LinkID.h"
    49 #include "ariba/communication/EndpointDescriptor.h"
    50 
     49// #include <ariba/utility/misc/sha1.h>
     50#include "ariba/overlay/SequenceNumber.h"
    5151
    5252namespace ariba {
     
    5757using ariba::utility::ServiceID;
    5858using ariba::utility::Message;
    59 using ariba::communication::EndpointDescriptor;
     59//using ariba::communication::EndpointDescriptor;
    6060using_serialization;
    6161
     
    6464 * between nodes.
    6565 *
    66  * @author Sebastian Mies <mies@tm.uka.de>
     66 * @author Sebastian Mies <mies@tm.uka.de>, Mario Hock
    6767 */
    6868class OverlayMsg: public Message { VSERIALIZEABLE;
     
    7575                maskTransfer    = 0x10, ///< bit mask for transfer messages
    7676                typeData        = 0x11, ///< message contains data for higher layers
     77                typeMessageLost = 0x12, ///< message contains info about a dropped message
    7778
    7879                // join signaling
     
    8788                typeLinkUpdate  = 0x33, ///< update message for link association
    8889                typeLinkDirect  = 0x34, ///< direct connection has been established
    89                 typeLinkAlive   = 0x35, ///< keep-alive message
     90                typeKeepAlive   = 0x35, ///< keep-alive message
     91                typeKeepAliveReply   = 0x36, ///< keep-alive message (replay)
     92                typeLinkClose   = 0x37,
    9093
    9194                /// DHT routed messages
     
    100103                maskDHTResponse = 0x50, ///< bit mask for dht responses
    101104                typeDHTData     = 0x51, ///< DHT get data
     105       
     106        /// misc message types
     107        typePing        = 0x44,
     108        typePong        = 0x45,
    102109
    103110                // topology signaling
     
    105112                typeSignalingEnd = 0xFF    ///< end of the signaling types
    106113        };
     114   
     115    /// message flags (uint8_t)
     116    enum flags_
     117    {
     118        flagRelayed         = 1 << 0,
     119        flagRegisterRelay   = 1 << 1,
     120        flagRouteRecord     = 1 << 2,
     121        flagSeqNum1         = 1 << 3,
     122        flagSeqNum2         = 1 << 4,
     123        flagAutoLink        = 1 << 5,
     124        flagLinkMessage     = 1 << 6,
     125        flagHasMoreFlags    = 1 << 7
     126    };
    107127
    108128        /// default constructor
     
    114134                const LinkID& _sourceLink      = LinkID::UNSPECIFIED,
    115135                const LinkID& _destinationLink = LinkID::UNSPECIFIED )
    116         :       type(type), flags(0), hops(0), ttl(10),
     136    :   type(type), flags(0), extended_flags(0), hops(0), ttl(10), priority(0),
    117137                service(_service),
    118138                sourceNode(_sourceNode), destinationNode(_destinationNode),
     
    125145        // copy constructor
    126146        OverlayMsg(const OverlayMsg& rhs)
    127         :       type(rhs.type), flags(rhs.flags), hops(rhs.hops), ttl(rhs.ttl),
    128                 service(rhs.service),
     147    :   type(rhs.type), flags(rhs.flags), extended_flags(rhs.extended_flags),
     148        hops(rhs.hops), ttl(rhs.ttl),
     149                priority(rhs.priority), service(rhs.service),
    129150                sourceNode(rhs.sourceNode), destinationNode(rhs.destinationNode),
    130151                sourceLink(rhs.sourceLink), destinationLink(rhs.destinationLink),
     
    149170        }
    150171
     172        /// priority ------------------------------------------------------------------
     173       
     174        uint8_t getPriority() const {
     175            return priority;
     176        }
     177       
     178        void setPriority(uint8_t priority) {
     179            this->priority = priority;
     180        }
     181       
    151182        /// flags ------------------------------------------------------------------
    152183
    153184        bool isRelayed() const {
    154                 return (flags & 0x01)!=0;
     185        return (flags & flagRelayed)!=0;
    155186        }
    156187
    157188        void setRelayed( bool relayed = true ) {
    158                 if (relayed) flags |= 1; else flags &= ~1;
     189        if (relayed) flags |= flagRelayed; else flags &= ~flagRelayed;
    159190        }
    160191
    161192        bool isRegisterRelay() const {
    162                 return (flags & 0x02)!=0;
     193                return (flags & flagRegisterRelay)!=0;
    163194        }
    164195
    165196        void setRegisterRelay( bool relayed = true ) {
    166                 if (relayed) flags |= 0x02; else flags &= ~0x02;
     197        if (relayed) flags |= flagRegisterRelay; else flags &= ~flagRegisterRelay;
    167198        }
    168199
    169200        bool isRouteRecord() const {
    170                 return (flags & 0x04)!=0;
     201                return (flags & flagRouteRecord)!=0;
    171202        }
    172203
    173204        void setRouteRecord( bool route_record = true ) {
    174                 if (route_record) flags |= 0x04; else flags &= ~0x04;
     205        if (route_record) flags |= flagRouteRecord; else flags &= ~flagRouteRecord;
    175206        }
    176207
    177208        bool isAutoLink() const {
    178                 return (flags & 0x80) == 0x80;
     209        return (flags & flagAutoLink) == flagAutoLink;
    179210        }
    180211
    181212        void setAutoLink(bool auto_link = true ) {
    182                 if (auto_link) flags |= 0x80; else flags &= ~0x80;
     213        if (auto_link) flags |= flagAutoLink; else flags &= ~flagAutoLink;
    183214        }
    184215
    185216        bool isLinkMessage() const {
    186                 return (flags & 0x40)!=0;
     217                return (flags & flagLinkMessage)!=0;
    187218        }
    188219
    189220        void setLinkMessage(bool link_info = true ) {
    190                 if (link_info) flags |= 0x40; else flags &= ~0x40;
    191         }
    192 
    193         bool containsSourceEndpoint() const {
    194                 return (flags & 0x20)!=0;
    195         }
    196 
    197         void setContainsSourceEndpoint(bool contains_endpoint) {
    198                 if (contains_endpoint) flags |= 0x20; else flags &= ~0x20;
    199         }
     221        if (link_info) flags |= flagLinkMessage; else flags &= ~flagLinkMessage;
     222        }
     223       
     224        bool hasExtendedFlags() const {
     225        return (flags & flagHasMoreFlags) == flagHasMoreFlags;
     226    }
    200227
    201228        /// number of hops and time to live ----------------------------------------
     
    264291                this->destinationLink = link;
    265292                setLinkMessage();
    266         }
    267 
    268         void setSourceEndpoint( const EndpointDescriptor& endpoint ) {
    269                 sourceEndpoint = endpoint;
    270                 setContainsSourceEndpoint(true);
    271         }
    272 
    273         const EndpointDescriptor& getSourceEndpoint() const {
    274                 return sourceEndpoint;
    275293        }
    276294
     
    284302                destinationLink = dummyLink;
    285303                hops = 0;
     304                routeRecord.clear();
    286305        }
    287306
     
    294313                        routeRecord.push_back(node);
    295314        }
     315       
     316        /// sequence numbers
     317        bool hasShortSeqNum() const
     318    {
     319        return (flags & (flagSeqNum1 | flagSeqNum2)) == flagSeqNum1;
     320    }
     321   
     322    bool hasLongSeqNum() const
     323    {
     324        return (flags & (flagSeqNum1 | flagSeqNum2)) == flagSeqNum2;
     325    }
     326   
     327    void setSeqNum(const SequenceNumber& sequence_number)
     328    {
     329        this->seqnum = sequence_number;
     330       
     331        // short seqnum
     332        if ( sequence_number.isShortSeqNum() )
     333        {
     334            flags |= flagSeqNum1;
     335            flags &= ~flagSeqNum2;
     336        }
     337        // longseqnum
     338        else if ( sequence_number.isShortSeqNum() )
     339        {
     340            flags &= ~flagSeqNum1;
     341            flags |= flagSeqNum2;
     342        }
     343        // no seqnum
     344        else
     345        {
     346            flags &= ~flagSeqNum1;
     347            flags &= ~flagSeqNum2;
     348        }
     349    }
     350   
     351    const SequenceNumber& getSeqNum() const
     352    {
     353        return seqnum;
     354    }
     355   
    296356
    297357private:
    298         uint8_t type, flags, hops, ttl;
     358        uint8_t type, flags, extended_flags, hops, ttl, priority;
    299359        ServiceID service;
    300360        NodeID sourceNode;
     
    302362        LinkID sourceLink;
    303363        LinkID destinationLink;
    304         EndpointDescriptor sourceEndpoint;
     364//      EndpointDescriptor sourceEndpoint;
    305365        vector<NodeID> routeRecord;
     366    SequenceNumber seqnum;
    306367};
    307368
     
    311372sznBeginDefault( ariba::overlay::OverlayMsg, X ){
    312373        // header
    313         X && type && flags && hops && ttl;
     374        X && type && flags;
     375   
     376    if ( hasExtendedFlags() )
     377        X && extended_flags;
     378   
     379    X && hops && ttl;
    314380
    315381        // addresses
    316382        X && &service && &sourceNode && &destinationNode;
     383       
     384        // priority
     385        X && priority;
    317386
    318387        // message is associated with a end-to-end link
     
    320389                X && &sourceLink && &destinationLink;
    321390
    322         // message is associated with a source end-point
    323         if (containsSourceEndpoint())
    324                 X && sourceEndpoint;
    325 
     391   
     392    /* seqnum */
     393    // serialize
     394    if ( X.isSerializer() )
     395    {
     396        if ( hasShortSeqNum() )
     397        {
     398            uint32_t short_seqnum;
     399            short_seqnum = seqnum.getShortSeqNum();
     400            X && short_seqnum;
     401        }
     402        if ( hasLongSeqNum() )
     403        {
     404            uint64_t long_seqnum;
     405            long_seqnum = seqnum.getLongSeqNum();
     406            X && long_seqnum;
     407        }
     408    }
     409    // deserialize
     410    else
     411    {
     412        if ( hasShortSeqNum() )
     413        {
     414            uint32_t short_seqnum;
     415            X && short_seqnum;
     416            seqnum = ariba::overlay::SequenceNumber(short_seqnum);
     417        }
     418        if ( hasLongSeqNum() )
     419        {
     420            uint64_t long_seqnum;
     421            X && long_seqnum;
     422            seqnum = ariba::overlay::SequenceNumber(long_seqnum);
     423        }       
     424    }
     425   
     426   
    326427        // message should record its route
    327428        if (isRouteRecord()) {
     
    333434
    334435        // payload
    335         X && Payload();
     436//      X && Payload();
    336437} sznEnd();
    337438
  • source/ariba/overlay/modules/OverlayFactory.cpp

    r3718 r12060  
    4141// structured overlays
    4242#include "chord/Chord.h"
    43 #include "onehop/OneHop.h"
     43//#include "onehop/OneHop.h"  //DEPRECATED
    4444
    4545namespace ariba {
     
    5757                        return new Chord( baseoverlay, nodeid, routeReceiver, param );
    5858
    59                 case OverlayParameterSet::OverlayStructureOneHop:
    60                         return new OneHop( baseoverlay, nodeid, routeReceiver, param );
     59//              case OverlayParameterSet::OverlayStructureOneHop:
     60//                      return new OneHop( baseoverlay, nodeid, routeReceiver, param );
    6161
    6262                default:
     63                    // NEVER return "NULL"
     64                    assert(false);
     65                    throw 42;
    6366                        return NULL;
    6467        }
    6568
     69        // NEVER return "NULL"
     70        assert(false);
     71        throw 42;
    6672        return NULL;
    6773}
  • source/ariba/overlay/modules/OverlayInterface.cpp

    r6854 r12060  
    6868
    6969void OverlayInterface::onLinkFail(const LinkID& lnk, const NodeID& remote) {
     70    onLinkDown(lnk, remote);
    7071}
    7172
     
    7980}
    8081
    81 void OverlayInterface::onMessage(const DataMessage& msg, const NodeID& remote,
     82void OverlayInterface::onMessage(OverlayMsg* msg,
     83        reboost::shared_buffer_t sub_msg,
     84        const NodeID& remote,
    8285                const LinkID& lnk) {
    8386}
  • source/ariba/overlay/modules/OverlayInterface.h

    r10573 r12060  
    145145         */
    146146        virtual const LinkID& getNextLinkId( const NodeID& id ) const = 0;
    147        
     147
     148    /**
     149     * Returns link ids of possible next hops a route message could take,
     150     * sorted by "quality" (e.g. overlay-distance).
     151     *
     152     * The »num« parameter can be used to specify the desired number of elements
     153     * in the returned vector. This is intendet for optimizations. The
     154     * implementation may choose to return a different number of elements than
     155     * requested.
     156     *
     157     * NOTE: The returned vector may contain »unspecified« links. These refer to
     158     * to the own node. (e.g. If there's no closer node, the top element in the
     159     * returned vector is unsoecified.)
     160     *
     161     * @param id The destination node id
     162     * @param num The desired number of elements in the returned vector.
     163     *            (0 means »not specified/max)«
     164     * @return A sorted vector of link ids to possible next hops.
     165     */
     166    virtual std::vector<const LinkID*> getSortedLinkIdsTowardsNode(
     167        const NodeID& id, int num = 0 ) const = 0;
     168
    148169        /**
    149170         * Returns the NodeID of the next hop a route message would take.
     
    176197
    177198        /// @see CommunicationListener
    178         virtual void onMessage(const DataMessage& msg, const NodeID& remote,
     199        virtual void onMessage(OverlayMsg* msg,
     200                reboost::shared_buffer_t sub_msg,
     201                const NodeID& remote,
    179202                        const LinkID& lnk = LinkID::UNSPECIFIED);
    180203
  • source/ariba/overlay/modules/OverlayStructureEvents.h

    r5151 r12060  
    4141
    4242#include "ariba/utility/types/NodeID.h"
    43 #include "ariba/utility/messages.h"
     43#include "ariba/utility/types/LinkID.h"
     44#include "ariba/utility/messages/Message.h"
    4445
    4546using ariba::utility::NodeID;
     47using ariba::utility::LinkID;
    4648using ariba::utility::Message;
    4749
     
    5052
    5153class OverlayInterface;
    52 class OneHop;
     54//class OneHop;
    5355
    5456class OverlayStructureEvents {
    5557        friend class ariba::overlay::OverlayInterface;
    56         friend class ariba::overlay::OneHop;
     58//      friend class ariba::overlay::OneHop;
    5759
    5860public:
  • source/ariba/overlay/modules/chord/Chord.cpp

    r10572 r12060  
    4343#include "detail/chord_routing_table.hpp"
    4444
    45 #include "messages/Discovery.h"
     45//#include "messages/Discovery.h"  // XXX DEPRECATED
    4646
    4747namespace ariba {
     
    5555typedef chord_routing_table::item route_item;
    5656
     57using ariba::transport::system_priority;
     58
    5759use_logging_cpp( Chord );
     60
     61
     62////// Messages
     63struct DiscoveryMessage
     64{
     65    /**
     66     * DiscoveryMessage
     67     * - type
     68     * - data
     69     * - Endpoint
     70     */
     71
     72    // type enum
     73    enum type_ {
     74        invalid = 0,
     75        normal = 1,
     76        successor = 2,
     77        predecessor = 3
     78    };
     79
     80   
     81    // data
     82    uint8_t type;
     83    uint8_t ttl;
     84    EndpointDescriptor endpoint;
     85
     86    // serialize
     87    reboost::message_t serialize()
     88    {
     89        // serialize endpoint
     90        reboost::message_t msg = endpoint.serialize();
     91       
     92        // serialize type and ttl
     93        uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data();
     94        buff1[0] = type;
     95        buff1[1] = ttl;
     96       
     97        return msg;
     98    }
     99   
     100    //deserialize
     101    reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff)
     102    {
     103        // deserialize type and ttl
     104        const uint8_t* bytes = buff.data();
     105        type = bytes[0];
     106        ttl = bytes[1];
     107       
     108        // deserialize endpoint
     109        return endpoint.deserialize(buff(2*sizeof(uint8_t)));
     110    }
     111};
     112
    58113
    59114Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid,
     
    102157
    103158/// helper: sends a message using the "base overlay"
    104 seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) {
    105         if (link.isUnspecified()) return 0;
    106         return baseoverlay.send_link( msg, link );
     159void Chord::send( OverlayMsg* msg, const LinkID& link ) {
     160        if (link.isUnspecified())
     161        return;
     162   
     163        baseoverlay.send_link( msg, link, system_priority::OVERLAY );
     164}
     165
     166void Chord::send_node( OverlayMsg* message, const NodeID& remote )
     167{
     168    try
     169    {
     170        baseoverlay.send( message, remote, system_priority::OVERLAY );
     171    }
     172    catch ( message_not_sent& e )
     173    {
     174        logging_warn("Chord: Could not send message to " << remote
     175                << ": " << e.what());
     176    }
    107177}
    108178
     
    116186        OverlayMsg msg( typeDiscovery );
    117187        msg.setRegisterRelay(true);
    118         Discovery dmsg( Discovery::normal, (uint8_t)ttl, baseoverlay.getEndpointDescriptor() );
    119         msg.encapsulate(&dmsg);
     188       
     189        // create DiscoveryMessage
     190        DiscoveryMessage dmsg;
     191        dmsg.type = DiscoveryMessage::normal;
     192        dmsg.ttl = ttl;
     193        dmsg.endpoint = baseoverlay.getEndpointDescriptor();
     194
     195        msg.set_payload_message(dmsg.serialize());
    120196
    121197        // send to node
    122         baseoverlay.send_node( &msg, remote );
     198    try
     199    {
     200        baseoverlay.send_node( &msg, remote, system_priority::OVERLAY );
     201    }
     202    catch ( message_not_sent& e )
     203    {
     204        logging_warn("Chord: Could not send message to " << remote
     205                << ": " << e.what());
     206    }
    123207}
    124208
    125209void Chord::discover_neighbors( const LinkID& link ) {
    126210        uint8_t ttl = 1;
     211       
     212    // FIXME try-catch for the send operations
     213   
     214    // create DiscoveryMessage
     215    DiscoveryMessage dmsg;
     216    dmsg.ttl = ttl;
     217    dmsg.endpoint = baseoverlay.getEndpointDescriptor();
    127218        {
    128219                // send predecessor discovery
    129220                OverlayMsg msg( typeDiscovery );
    130221                msg.setRegisterRelay(true);
    131                 Discovery dmsg( Discovery::predecessor, ttl,
    132                         baseoverlay.getEndpointDescriptor() );
    133                 msg.encapsulate(&dmsg);
     222
     223                // set type
     224            dmsg.type = DiscoveryMessage::predecessor;
     225
     226            // send
     227            msg.set_payload_message(dmsg.serialize());
    134228                send(&msg, link);
    135229        }
     
    137231                // send successor discovery
    138232                OverlayMsg msg( typeDiscovery );
    139                 msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() );
     233//              msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() );  // XXX this was redundand, wasn't it?
    140234                msg.setRegisterRelay(true);
    141                 Discovery dmsg( Discovery::successor, ttl,
    142                         baseoverlay.getEndpointDescriptor() );
    143                 msg.encapsulate(&dmsg);
     235
     236                // set type
     237        dmsg.type = DiscoveryMessage::successor;
     238
     239        // send
     240        msg.set_payload_message(dmsg.serialize());
    144241                send(&msg, link);
    145242        }
     
    163260
    164261        // timer for stabilization management
    165         Timer::setInterval(1000);
     262//      Timer::setInterval(1000);  // TODO find an appropriate interval!
     263        Timer::setInterval(10000);  // XXX testing...
    166264        Timer::start();
    167265}
     
    200298        return item->info;
    201299}
     300
     301std::vector<const LinkID*> Chord::getSortedLinkIdsTowardsNode(
     302    const NodeID& id, int num ) const
     303{
     304    std::vector<const LinkID*> ret;
     305   
     306    switch ( num )
     307    {
     308        // special case: just call »getNextLinkId«
     309        case 1:
     310        {
     311            ret.push_back(&getNextLinkId(id));
     312           
     313            break;
     314        }
     315       
     316        // * calculate top 2 *
     317        case 0:
     318        case 2:
     319        {
     320            std::vector<const route_item*> items = table->get_next_2_hops(id);
     321           
     322            ret.reserve(items.size());
     323           
     324            BOOST_FOREACH( const route_item* item, items )
     325            {
     326                ret.push_back(&item->info);
     327            }
     328           
     329            break;
     330        }
     331       
     332        // NOTE: implement real sorting, if needed (and handle "case 0" properly, then)
     333        default:
     334        {
     335            throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)");
     336           
     337            break;
     338        }
     339    }
     340   
     341    return ret;
     342}
     343
    202344
    203345/// @see OverlayInterface.h
     
    253395        if (remote==nodeid) {
    254396                logging_warn("dropping link that has been established to myself (nodes have same nodeid?)");
     397                logging_warn("NodeID: " << remote);
    255398                baseoverlay.dropLink(lnk);
    256399                return;
     
    290433/// @see CommunicationListener.h or @see OverlayInterface.h
    291434void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) {
    292         logging_debug("link_down: link=" << lnk.toString() << " remote=" <<
     435    // XXX logging_debug
     436        logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" <<
    293437                        remote.toString() );
    294438
     
    303447/// @see CommunicationListener.h
    304448/// @see OverlayInterface.h
    305 void Chord::onMessage(const DataMessage& msg, const NodeID& remote,
     449void Chord::onMessage(OverlayMsg* msg,
     450        reboost::shared_buffer_t sub_msg,
     451        const NodeID& remote,
    306452                const LinkID& link) {
    307453
    308         // decode message
    309         OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage());
    310         if (m == NULL) return;
    311 
    312454        // handle messages
    313         switch ((signalMessageTypes)m->getType()) {
     455        switch ((signalMessageTypes) msg->getType()) {
    314456
    315457        // discovery request
    316         case typeDiscovery: {
    317                 // decapsulate message
    318                 Discovery* dmsg = m->decapsulate<Discovery> ();
     458        case typeDiscovery:
     459        {
     460                // deserialize discovery message
     461                DiscoveryMessage dmsg;
     462                dmsg.deserialize(sub_msg);
     463               
    319464                logging_debug("Received discovery message with"
    320                             << " src=" << m->getSourceNode().toString()
    321                                 << " dst=" << m->getDestinationNode().toString()
    322                                 << " ttl=" << (int)dmsg->getTTL()
    323                                 << " type=" << (int)dmsg->getType()
     465                            << " src=" << msg->getSourceNode().toString()
     466                                << " dst=" << msg->getDestinationNode().toString()
     467                                << " ttl=" << (int)dmsg.ttl
     468                                << " type=" << (int)dmsg.type
    324469                );
    325470
     
    327472                bool found = false;
    328473                BOOST_FOREACH( NodeID& value, discovery )
    329                         if (value == m->getSourceNode()) {
     474                        if (value == msg->getSourceNode()) {
    330475                                found = true;
    331476                                break;
    332477                        }
    333                 if (!found) discovery.push_back(m->getSourceNode());
     478                if (!found) discovery.push_back(msg->getSourceNode());
    334479
    335480                // check if source node can be added to routing table and setup link
    336                 if (m->getSourceNode() != nodeid)
    337                         setup( dmsg->getEndpoint(), m->getSourceNode() );
     481                if (msg->getSourceNode() != nodeid)
     482                        setup( dmsg.endpoint, msg->getSourceNode() );
    338483
    339484                // process discovery message -------------------------- switch start --
    340                 switch (dmsg->getType()) {
    341 
    342                 // normal: route discovery message like every other message
    343                 case Discovery::normal: {
    344                         // closest node? yes-> split to follow successor and predecessor
    345                         if ( table->is_closest_to(m->getDestinationNode()) ) {
    346                                 logging_debug("Discovery split:");
    347                                 if (!table->get_successor()->isUnspecified()) {
    348                                         OverlayMsg omsg(*m);
    349                                         dmsg->setType(Discovery::successor);
    350                                         omsg.encapsulate(dmsg);
    351                                         logging_debug("* Routing to successor "
    352                                                         << table->get_successor()->toString() );
    353                                         baseoverlay.send( &omsg, *table->get_successor() );
    354                                 }
    355 
    356                                 // send predecessor message
    357                                 if (!table->get_predesessor()->isUnspecified()) {
    358                                         OverlayMsg omsg(*m);
    359                                         dmsg->setType(Discovery::predecessor);
    360                                         omsg.encapsulate(dmsg);
    361                                         logging_debug("* Routing to predecessor "
    362                                                         << table->get_predesessor()->toString() );
    363                                         baseoverlay.send( &omsg, *table->get_predesessor() );
    364                                 }
    365                         }
    366                         // no-> route message
    367                         else {
    368                                 baseoverlay.route( m );
    369                         }
    370                         break;
    371                 }
    372 
    373                 // successor mode: follow the successor until TTL is zero
    374                 case Discovery::successor:
    375                 case Discovery::predecessor: {
    376                         // reached destination? no->forward!
    377                         if (m->getDestinationNode() != nodeid) {
    378                                 OverlayMsg omsg(*m);
    379                                 omsg.encapsulate(dmsg);
    380                                 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
    381                                 baseoverlay.route( &omsg );
    382                                 break;
    383                         }
    384 
    385                         // time to live ended? yes-> stop routing
    386                         if (dmsg->getTTL() == 0 || dmsg->getTTL() > 10) break;
    387 
    388                         // decrease time-to-live
    389                         dmsg->setTTL(dmsg->getTTL() - 1);
    390 
    391                         const route_item* item = NULL;
    392                         if (dmsg->getType() == Discovery::successor &&
    393                                         table->get_successor() != NULL) {
    394                                 item = table->get(*table->get_successor());
    395                         } else {
    396                                 if (table->get_predesessor()!=NULL)
    397                                         item = table->get(*table->get_predesessor());
    398                         }
    399                         if (item == NULL)
    400                                 break;
    401 
    402                         logging_debug("Routing discovery message to succ/pred "
    403                                 << item->id.toString() );
    404                         OverlayMsg omsg(*m);
    405                         omsg.encapsulate(dmsg);
    406                         omsg.setDestinationNode(item->id);
    407                         omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
    408                         baseoverlay.send(&omsg, omsg.getDestinationNode());
    409                         break;
    410                 }
    411                 case Discovery::invalid:
    412                         break;
    413 
    414                 default:
    415                         break;
    416                 }
    417                 // process discovery message ---------------------------- switch end --
    418 
    419                 delete dmsg;
    420                 break;
    421         }
    422 
    423         // leave
    424         case typeLeave: {
    425                 if (link!=LinkID::UNSPECIFIED) {
    426                         route_item* item = table->get(remote);
    427                         if (item!=NULL) item->info = LinkID::UNSPECIFIED;
    428                         table->remove(remote);
    429                         baseoverlay.dropLink(link);
    430                 }
    431                 break;
    432         }}
     485                switch ( dmsg.type )
     486                {
     487            // normal: route discovery message like every other message
     488            case DiscoveryMessage::normal:
     489            {
     490                // closest node? yes-> split to follow successor and predecessor
     491                if ( table->is_closest_to(msg->getDestinationNode()) )
     492                {
     493                    logging_debug("Discovery split:");
     494                    if (!table->get_successor()->isUnspecified())
     495                    {
     496                        OverlayMsg omsg(*msg);
     497                       
     498                        dmsg.type = DiscoveryMessage::successor;
     499                        omsg.set_payload_message(dmsg.serialize());
     500
     501                        logging_debug("* Routing to successor "
     502                                << table->get_successor()->toString() );
     503                        send_node( &omsg, *table->get_successor() );
     504                    }
     505   
     506                    // send predecessor message
     507                    if (!table->get_predesessor()->isUnspecified())
     508                    {
     509                        OverlayMsg omsg(*msg);
     510                       
     511                        dmsg.type = DiscoveryMessage::predecessor;
     512                        omsg.set_payload_message(dmsg.serialize());
     513
     514                        logging_debug("* Routing to predecessor "
     515                                << table->get_predesessor()->toString() );
     516                        send_node( &omsg, *table->get_predesessor() );
     517                    }
     518                }
     519                // no-> route message
     520                else
     521                {
     522                    baseoverlay.route( msg );
     523                }
     524                break;
     525            }
     526   
     527            // successor mode: follow the successor until TTL is zero
     528            case DiscoveryMessage::successor:
     529            case DiscoveryMessage::predecessor:
     530            {
     531                // reached destination? no->forward!
     532                if (msg->getDestinationNode() != nodeid)
     533                {
     534                    OverlayMsg omsg(*msg);
     535                    omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
     536                   
     537                    omsg.set_payload_message(dmsg.serialize());
     538                   
     539                    baseoverlay.route( &omsg );
     540                    break;
     541                }
     542   
     543                // time to live ended? yes-> stop routing
     544                if (dmsg.ttl == 0 || dmsg.ttl > 10) break;
     545   
     546                // decrease time-to-live
     547                dmsg.ttl--;
     548   
     549                const route_item* item = NULL;
     550                if (dmsg.type == DiscoveryMessage::successor &&
     551                        table->get_successor() != NULL)
     552                {
     553                    item = table->get(*table->get_successor());
     554                }
     555                else if (table->get_predesessor() != NULL)
     556                {
     557                        item = table->get(*table->get_predesessor());
     558                }
     559                if (item == NULL)
     560                    break;
     561   
     562                logging_debug("Routing discovery message to succ/pred "
     563                    << item->id.toString() );
     564                OverlayMsg omsg(*msg);
     565                omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
     566                omsg.setDestinationNode(item->id);
     567               
     568                omsg.set_payload_message(dmsg.serialize());
     569               
     570                send_node( &omsg, omsg.getDestinationNode() );
     571                break;
     572            }
     573            case DiscoveryMessage::invalid:
     574                break;
     575   
     576            default:
     577                break;
     578            }
     579            // process discovery message ---------------------------- switch end --
     580   
     581            break;
     582        }
     583   
     584        // leave
     585        case typeLeave: {
     586            if (link!=LinkID::UNSPECIFIED) {
     587                route_item* item = table->get(remote);
     588                if (item!=NULL) item->info = LinkID::UNSPECIFIED;
     589                table->remove(remote);
     590                baseoverlay.dropLink(link);
     591            }
     592            break;
     593        }
     594        }
    433595}
    434596
  • source/ariba/overlay/modules/chord/Chord.h

    r10572 r12060  
    4545#include "../OverlayInterface.h"
    4646#include <vector>
     47#include <stdexcept>
    4748
    4849class chord_routing_table;
     
    8788                const NodeID& node = NodeID::UNSPECIFIED );
    8889
    89         // helper: sends a message using the "base overlay"
    90         seqnum_t send( OverlayMsg* msg, const LinkID& link );
     90        // helper: sends a message over a link using the "base overlay"
     91        void send( OverlayMsg* msg, const LinkID& link );
    9192
     93        // helper: sends a message to a node using the "base overlay"
     94        void send_node( OverlayMsg* message, const NodeID& remote );
     95               
    9296        // stabilization: sends a discovery message to the specified neighborhood
    9397        void send_discovery_to( const NodeID& destination, int ttl = 3 );
     
    105109        virtual const LinkID& getNextLinkId( const NodeID& id ) const;
    106110       
     111    /// @see OverlayInterface.h
     112    /// NOTE: This implementation excepts num == 2
     113    virtual std::vector<const LinkID*> getSortedLinkIdsTowardsNode(
     114        const NodeID& id, int num = 0 ) const;
     115
    107116        /// @see OverlayInterface.h
    108117        virtual const NodeID& getNextNodeId( const NodeID& id ) const;
     
    138147
    139148        /// @see CommunicationListener.h or @see OverlayInterface.h
    140         virtual void onMessage(const DataMessage& msg, const NodeID& remote,
     149        virtual void onMessage(OverlayMsg* msg,
     150                reboost::shared_buffer_t sub_msg,
     151                const NodeID& remote,
    141152                        const LinkID& lnk = LinkID::UNSPECIFIED);
    142153
  • source/ariba/overlay/modules/chord/detail/chord_routing_table.hpp

    r8606 r12060  
    9696        // the own node id
    9797        nodeid_t id;
     98   
     99    // the own node id as routing item
     100    item own_id_item;
    98101
    99102        // successor and predecessor tables
     
    168171        /// constructs the reactive chord routing table
    169172        explicit chord_routing_table( const nodeid_t& id, int redundancy = 4 ) :
    170                 id(id), succ( redundancy, succ_compare_type(this->id), *this ),
    171                 pred( redundancy, pred_compare_type(this->id), *this ) {
     173                id(id),
     174                succ( redundancy, succ_compare_type(this->id), *this ),
     175                pred( redundancy, pred_compare_type(this->id), *this )
     176    {
     177        // init reflexive item
     178        own_id_item.id = id;
     179        own_id_item.ref_count = 1;
    172180
    173181                // create finger tables
     
    273281                        }
    274282                }
     283
    275284                if (best_item != NULL && distance(value, id)<distance(value, best_item->id))
    276285                        return NULL;
    277286                return best_item;
    278287        }
     288       
     289        std::vector<const item*> get_next_2_hops( const nodeid_t& value)
     290    {
     291        ring_distance distance;
     292        item* best_item = &own_id_item;
     293        item* second_best_item = NULL;
     294       
     295        // find best and second best item
     296        for (size_t i=0; i<table.size(); i++)
     297        {
     298            item* curr = &table[i];
     299
     300            // not not include orphans into routing!
     301            if (curr->ref_count==0) continue;
     302
     303            // check if we found a better item
     304            // is best item
     305            if ( distance(value, curr->id) < distance(value, best_item->id) )
     306            {
     307                second_best_item = best_item;
     308                best_item = curr;
     309            }
     310            // is second best item
     311            else
     312            {
     313                if ( second_best_item == NULL )
     314                {
     315                    second_best_item = curr;
     316                    continue;
     317                }
     318               
     319                if ( distance(value, curr->id) < distance(value, second_best_item->id) )
     320                {
     321                    second_best_item = curr;
     322                }
     323            }
     324        }
     325
     326        // prepare return vector
     327        std::vector<const item*> ret;
     328        if ( best_item != NULL )
     329        {
     330            ret.push_back(best_item);
     331        }
     332        if ( second_best_item != NULL )
     333        {
     334            ret.push_back(second_best_item);
     335        }
     336       
     337        return ret;
     338    }
    279339
    280340        const nodeid_t* get_successor() {
    281                 if (succ.size()==NULL) return NULL;
     341                if (succ.size()==0) return NULL;
    282342                return &succ.front();
    283343        }
    284344
    285345        const nodeid_t* get_predesessor() {
    286                 if (pred.size()==NULL) return NULL;
     346                if (pred.size()==0) return NULL;
    287347                return &pred.front();
    288348        }
  • source/ariba/overlay/modules/chord/messages/CMakeLists.txt

    r10700 r12060  
    3737# [License]
    3838
    39 add_headers(Discovery.h)
     39#add_headers(Discovery.h)
    4040
    41 add_sources(Discovery.cpp)
     41#add_sources(Discovery.cpp)
  • source/ariba/overlay/modules/chord/messages/Discovery.h

    r5870 r12060  
    5959using_serialization;
    6060
     61// XXX This whole message is DEPRECATED
    6162class Discovery : public Message {
    6263        VSERIALIZEABLE;
  • source/ariba/overlay/modules/onehop/CMakeLists.txt

    r10700 r12060  
    3737# [License]
    3838
    39 add_headers(OneHop.h)
    4039
    41 add_sources(OneHop.cpp)
     40## DEPRECATED
     41#add_headers(OneHop.h)
    4242
    43 add_subdir_sources(messages)
     43#add_sources(OneHop.cpp)
     44
     45#add_subdir_sources(messages)
  • source/ariba/utility/CMakeLists.txt

    r10700 r12060  
    4545add_subdir_sources(
    4646    addressing
     47    addressing2
    4748    bootstrap
    4849    configuration
    4950    internal
    5051    logging
    51     measurement
    5252    messages
    5353    misc
  • source/ariba/utility/addressing

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/addressing/endpoint_set.hpp

    r10653 r12060  
    1 #ifndef ENDPOINT_SET_HPP_
    2 #define ENDPOINT_SET_HPP_
     1#ifndef ENDPOINT_SET_HPP_DEPRECATED_
     2#define ENDPOINT_SET_HPP_DEPRECATED_
    33
    44#include "addressing.hpp"
  • source/ariba/utility/addressing/facades/address_v.hpp

    r10789 r12060  
    1313
    1414#include "../detail/address_convenience.hpp"
     15
     16#include <boost/shared_ptr.hpp>
    1517
    1618namespace ariba {
     
    2628class address_v: public detail::address_convenience<address_v> {
    2729public:
     30    typedef boost::shared_ptr<address_v> shared_ptr;
     31   
    2832        virtual ~address_v() {}
    2933       
  • source/ariba/utility/addressing/ip_address.hpp

    r6919 r12060  
    1 #ifndef IP_ADDRESS_HPP_
    2 #define IP_ADDRESS_HPP_
     1#ifndef IP_ADDRESS_HPP_DEPRECATED_
     2#define IP_ADDRESS_HPP_DEPRECATED_
    33
    44#include <string>
  • source/ariba/utility/addressing/tcpip_endpoint.hpp

    r5284 r12060  
    1 #ifndef TCPIP_ENDPOINT_HPP_
    2 #define TCPIP_ENDPOINT_HPP_
     1#ifndef TCPIP_ENDPOINT_HPP_DEPRECATED_
     2#define TCPIP_ENDPOINT_HPP_DEPRECATED_
    33
    44#include<string>
  • source/ariba/utility/bootstrap/modules/bluetoothsdp

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/bootstrap/modules/periodicbroadcast

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/bootstrap/modules/periodicbroadcast/PeriodicBroadcast.h

    r10653 r12060  
    4040#define __PERIODIC_BROADCAST_H
    4141
     42// ariba
    4243#include "ariba/config.h"
    43 
     44#include "ariba/utility/bootstrap/modules/BootstrapModule.h"
     45#include "ariba/utility/logging/Logging.h"
     46#include "ariba/utility/system/Timer.h"
     47
     48// ariba messages
     49#include "PeriodicBroadcastMessage.h"
     50
     51// (ariba) link-local
     52#include "ariba/utility/transport/StreamTransport/StreamTransport.hpp"
     53
     54// system
    4455#include <map>
    4556#include <string>
    4657#include <ctime>
    4758#include <iostream>
     59
     60// boost
    4861#include <boost/asio.hpp>
    4962#include <boost/foreach.hpp>
    5063#include <boost/thread/mutex.hpp>
    5164#include <boost/thread/thread.hpp>
    52 #include "ariba/utility/bootstrap/modules/BootstrapModule.h"
    53 #include "ariba/utility/logging/Logging.h"
    54 #include "ariba/utility/system/Timer.h"
    55 #include "PeriodicBroadcastMessage.h"
    56 
    57 //link-local
    58 #include "ariba/utility/transport/tcpip/tcpip.hpp"
     65
    5966
    6067using std::map;
     
    301308                               
    302309                                // include all link-local interfaces
    303                                 vector<uint64_t> scope_ids = ariba::transport::tcpip::get_interface_scope_ids();
     310                                vector<uint64_t> scope_ids = ariba::transport::get_interface_scope_ids();
    304311                               
    305312                                BOOST_FOREACH ( uint64_t id, scope_ids )
  • source/ariba/utility/logging/Logging.h

    r10700 r12060  
    9797
    9898  static int __loglevel__ = 2; //default is info
     99//  static int __loglevel__ = 1; // XXX use higher log level
    99100
    100101  #define logging_trace(x)  {                                   logging_stdout(x);                }
  • source/ariba/utility/messages.h

    r3690 r12060  
    4040#define MESSAGES_H_
    4141
     42// TODO wÃŒrde sagen das brauchen wir nicht mehr
     43//   ---> ÃŒberall dieses include rausnehmen und ggf  #include "messages/Message.h"   einfÃŒgen..
     44
    4245#include "messages/Message.h"
    43 #include "messages/MessageSender.h"
    44 #include "messages/MessageReceiver.h"
    45 #include "messages/MessageUtilities.h"
    46 #include "messages/MessageProvider.h"
    47 #include "messages/TextMessage.h"
     46//#include "messages/MessageSender.h"  // TODO wird das noch genutzt..? Wenn nein, sollte es weg!
     47//#include "messages/MessageReceiver.h"
     48//#include "messages/MessageUtilities.h"
     49////#include "messages/MessageProvider.h"  // DEPRECATED
     50//#include "messages/TextMessage.h"
    4851
    4952#endif /* MESSAGES_H_ */
  • source/ariba/utility/messages/Message.cpp

    r8620 r12060  
    4141#include "ariba/utility/serialization/DataStream.hpp"
    4242
     43#include "ariba/utility/logging/Logging.h"
     44
    4345NAMESPACE_BEGIN
    4446
     
    8082}
    8183
     84
     85reboost::message_t Message::wrap_up_for_sending()
     86{
     87    assert( ! wrapped_up );
     88    wrapped_up = true;
     89   
     90    //// Adapt to new message system ////
     91    Data data = data_serialize(this, DEFAULT_V);
     92    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     93
     94    newstyle_payload.push_front(buf);
     95   
     96    return newstyle_payload;
     97}
     98
     99reboost::shared_buffer_t Message::serialize_into_shared_buffer()
     100{
     101    assert ( newstyle_payload.length() == 0 );
     102   
     103    //// Adapt to new message system ////
     104    Data data = data_serialize(this, DEFAULT_V);
     105    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     106   
     107    return buf;
     108}
     109
     110
     111reboost::shared_buffer_t Message::deserialize_from_shared_buffer(reboost::shared_buffer_t buff)
     112{
     113    // NOTE: legacy payload is not allowed when using shared buffers
     114    this->legacy_payload_disabled = true;
     115   
     116    assert( buff.size() > 0 );
     117   
     118    // const_cast is necessary here, but without legacy payload we should be save here (more or less)
     119    Data dat(const_cast<uint8_t*>(buff.data()), buff.size() * 8);
     120   
     121    size_t len = this->SERIALIZATION_METHOD_NAME(DESERIALIZE, dat) / 8;
     122
     123    // return remaining sub-buffer
     124    return buff(len);
     125}
     126
     127
     128
    82129NAMESPACE_END
    83130
    84131std::ostream& operator<<(std::ostream& stream, const ariba::utility::Message& msg ) {
    85132        using_serialization;
    86         stream << "msg(type=" << typeid(msg).name();
     133        stream << "msg(type=" << typeid(msg).name() << ",";
    87134        stream << "len=" << (data_length(&msg)/8) << ",";
    88135        Data data = data_serialize(&msg);
    89         stream << ",data=" << data;
     136        stream << "data=" << data;
    90137        data.release();
    91138        stream << ")";
    92139        return stream;
    93140}
    94 
  • source/ariba/utility/messages/Message.h

    r6919 r12060  
    6262#include "ariba/utility/serialization.h"
    6363
     64// reboost messages
     65#include "ariba/utility/transport/messages/message.hpp"
     66
     67
    6468std::ostream& operator<<(std::ostream& stream, const ariba::utility::Message& msg );
    6569
     
    8286        friend std::ostream& ::operator<<(std::ostream& stream, const ariba::utility::Message& msg );
    8387
    84         // root binary data
    85         shared_array<uint8_t> root;
    86 
    8788        // payload
     89        bool legacy_payload_disabled;
    8890        bool releasePayload;
    8991        Data payload; //< messages binary data
     92       
     93        // XXX testing...
     94        reboost::message_t newstyle_payload;
     95        bool wrapped_up;
    9096
    9197        // addresses and control info
     
    98104         */
    99105        inline Message() :
    100                 root(), releasePayload(true), payload(), srcAddr(NULL),destAddr(NULL) {
     106            legacy_payload_disabled(false), releasePayload(true), payload(),
     107            newstyle_payload(), wrapped_up(false), srcAddr(NULL),destAddr(NULL) {
    101108        }
    102109
     
    106113         */
    107114        explicit inline Message( const Data& data ) :
    108                 releasePayload(true), srcAddr(NULL),destAddr(NULL) {
     115        legacy_payload_disabled(false), releasePayload(true),
     116        newstyle_payload(), wrapped_up(false), srcAddr(NULL),destAddr(NULL) {  // FIXME newstyle_payload..?
    109117                this->payload = data.clone();
    110118//              this->root = shared_array<uint8_t>((uint8_t*)data.getBuffer());
     
    225233                return decapsulate<T>();
    226234        }
     235       
     236       
     237        // XXX testing
     238        void set_payload_message(reboost::message_t msg)
     239        {
     240            newstyle_payload = msg;
     241        }
     242       
     243        void append_buffer(reboost::shared_buffer_t buff)
     244        {
     245            newstyle_payload.push_back(buff);
     246        }
     247       
     248       
     249        // XXX testing... packs this message into the payload message (do not use twice!!)
     250        virtual reboost::message_t wrap_up_for_sending();
     251       
     252       
     253        /**
     254         * Uses the old serialization system to serialize itself into a (new style) shared buffer.
     255         */
     256        virtual reboost::shared_buffer_t serialize_into_shared_buffer();
     257       
     258        /*
     259         * XXX experimental
     260         *
     261         * Uses the old serialization system to deserialize itself out of a (new style) shared buffer.
     262         * @return remaining sub-buffer (the "payload")
     263         *
     264         * Note: This is some kind of a hack! handle with care.
     265         */
     266        virtual reboost::shared_buffer_t deserialize_from_shared_buffer(reboost::shared_buffer_t buff);
     267       
    227268
    228269protected:
     
    262303         * @return A explicit payload serializer
    263304         */
    264         finline PayloadSerializer Payload( size_t length = ~0 ) {
     305        finline PayloadSerializer Payload( size_t length = ~0 )
     306        {
     307//          assert( ! legacy_payload_disabled );  // FIXME aktuell
     308           
    265309                return PayloadSerializer( this, length );
    266310        }
  • source/ariba/utility/messages/MessageProvider.cpp

    r6919 r12060  
    3737// [License]
    3838
    39 #include "MessageProvider.h"
    40 
    41 NAMESPACE_BEGIN
    42 
    43 MessageProvider::MessageProvider() {
    44 }
    45 
    46 MessageProvider::~MessageProvider() {
    47 }
    48 
    49 bool MessageProvider::sendMessageToReceivers( const Message* message ) {
    50         bool sent =  false;
    51         for (size_t i=0; i<receivers.size(); i++)
    52                 if (receivers[i]->receiveMessage(message, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED)) sent = true;
    53         return sent;
    54 }
    55 
    56 void MessageProvider::addMessageReceiver( MessageReceiver* receiver ) {
    57         receivers.push_back(receiver);
    58 }
    59 
    60 void MessageProvider::removeMessageReceiver( MessageReceiver* receiver ) {
    61         for (size_t i=0; i<receivers.size(); i++)
    62                 if (receivers[i]==receiver) {
    63                         receivers.erase( receivers.begin()+i );
    64                         break;
    65                 }
    66 }
    67 
    68 NAMESPACE_END
     39//#include "MessageProvider.h"
     40//
     41//NAMESPACE_BEGIN
     42//
     43//MessageProvider::MessageProvider() {
     44//}
     45//
     46//MessageProvider::~MessageProvider() {
     47//}
     48//
     49//bool MessageProvider::sendMessageToReceivers( const Message* message ) {
     50//      bool sent =  false;
     51//      for (size_t i=0; i<receivers.size(); i++)
     52//              if (receivers[i]->receiveMessage(message, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED)) sent = true;
     53//      return sent;
     54//}
     55//
     56//void MessageProvider::addMessageReceiver( MessageReceiver* receiver ) {
     57//      receivers.push_back(receiver);
     58//}
     59//
     60//void MessageProvider::removeMessageReceiver( MessageReceiver* receiver ) {
     61//      for (size_t i=0; i<receivers.size(); i++)
     62//              if (receivers[i]==receiver) {
     63//                      receivers.erase( receivers.begin()+i );
     64//                      break;
     65//              }
     66//}
     67//
     68//NAMESPACE_END
  • source/ariba/utility/messages/MessageProvider.h

    r3690 r12060  
     1// XXX DEPRECATED
     2
    13// [License]
    24// The Ariba-Underlay Copyright
     
    3739// [License]
    3840
     41// XXX DEPRECATED
     42
    3943#ifndef MESSAGEPROVIDER_H_
    4044#define MESSAGEPROVIDER_H_
    4145
    42 #include "_namespace.h"
    43 #include "MessageReceiver.h"
    44 #include "ariba/utility/types/LinkID.h"
    45 #include "ariba/utility/types/NodeID.h"
    46 #include <vector>
    47 
    48 using std::vector;
    49 using ariba::utility::LinkID;
    50 using ariba::utility::NodeID;
    51 
    52 NAMESPACE_BEGIN
    53 
    54 
    55 /**
    56  * This class defines an interface for message providers.
    57  * Implementing classes must allow receivers to register themselves.
    58  *
    59  * @author Sebastian Mies
    60  */
    61 class MessageProvider {
    62 private:
    63         vector<MessageReceiver*> receivers;
    64 
    65 protected:
    66         bool sendMessageToReceivers( const Message* message );
    67 
    68 public:
    69         /**
    70          * Constructor.
    71          */
    72         MessageProvider();
    73 
    74         /**
    75          * Destructor.
    76          */
    77         ~MessageProvider();
    78 
    79         /**
    80          * Adds a message receiver.
    81          *
    82          * @param receiver The receiver.
    83          */
    84         void addMessageReceiver( MessageReceiver* receiver );
    85 
    86         /**
    87          * Removes a message receiver.
    88          *
    89          * @param receiver The receiver.
    90          */
    91         void removeMessageReceiver( MessageReceiver* receiver );
    92 };
    93 
    94 NAMESPACE_END
     46//#include "_namespace.h"
     47//#include "MessageReceiver.h"
     48//#include "ariba/utility/types/LinkID.h"
     49//#include "ariba/utility/types/NodeID.h"
     50//#include <vector>
     51//
     52//using std::vector;
     53//using ariba::utility::LinkID;
     54//using ariba::utility::NodeID;
     55//
     56//NAMESPACE_BEGIN
     57//
     58//
     59///**
     60// * This class defines an interface for message providers.
     61// * Implementing classes must allow receivers to register themselves.
     62// *
     63// * @author Sebastian Mies
     64// */
     65//class MessageProvider {
     66//private:
     67//      vector<MessageReceiver*> receivers;
     68//
     69//protected:
     70//      bool sendMessageToReceivers( const Message* message );
     71//
     72//public:
     73//      /**
     74//       * Constructor.
     75//       */
     76//      MessageProvider();
     77//
     78//      /**
     79//       * Destructor.
     80//       */
     81//      ~MessageProvider();
     82//
     83//      /**
     84//       * Adds a message receiver.
     85//       *
     86//       * @param receiver The receiver.
     87//       */
     88//      void addMessageReceiver( MessageReceiver* receiver );
     89//
     90//      /**
     91//       * Removes a message receiver.
     92//       *
     93//       * @param receiver The receiver.
     94//       */
     95//      void removeMessageReceiver( MessageReceiver* receiver );
     96//};
     97//
     98//NAMESPACE_END
    9599
    96100#endif /* MESSAGEPROVIDER_H_ */
  • source/ariba/utility/messages/MessageReceiver.cpp

    r3690 r12060  
    4949}
    5050
    51 bool MessageReceiver::receiveMessage( const Message* message, const LinkID& link, const NodeID& node ) {
    52         //std::cout << "UNIMPLEMENTED MessageReceiver got Message:" << (Message*)message << std::endl;
    53         return false;
    54 }
     51//bool MessageReceiver::receiveMessage( reboost::shared_buffer_t message, const LinkID& link, const NodeID& node ) {
     52//      //std::cout << "UNIMPLEMENTED MessageReceiver got Message:" << (Message*)message << std::endl;
     53//      return false;
     54//}
    5555
    5656NAMESPACE_END
  • source/ariba/utility/messages/MessageReceiver.h

    r3690 r12060  
    4040#define MESSAGERECEIVER_H__
    4141
    42 #include "ariba/utility/messages/Message.h"
     42//#include "ariba/utility/messages/Message.h"
     43// reboost messages
     44#include "ariba/utility/transport/messages/message.hpp"
    4345#include "ariba/utility/types/LinkID.h"
    4446#include "ariba/utility/types/NodeID.h"
     
    7375         * @return True, when the message has been accepted.
    7476         */
    75         virtual bool receiveMessage( const Message* message, const LinkID& link, const NodeID& node );
     77        virtual bool receiveMessage( reboost::shared_buffer_t message,
     78                const LinkID& link,
     79                const NodeID& node,
     80                bool bypass_overlay ) = 0;
    7681};
    7782
  • source/ariba/utility/misc/Demultiplexer.hpp

    r3705 r12060  
    136136
    137137                SERVICE_LISTENER_MAP_CITERATOR it = mapServiceListener.find( id );
    138                 if( it == mapServiceListener.end() )    return NULL;
     138                if( it == mapServiceListener.end() )    return 0;
    139139                else                                    return it->second;
    140140        }
  • source/ariba/utility/system/StartupWrapper.cpp

    r10700 r12060  
    3636// Telematics.
    3737// [License]
     38
     39// XXX NOTE: Use this class with caution! Config support is outdated.
    3840
    3941#include "StartupWrapper.h"
  • source/ariba/utility/system/StartupWrapper.h

    r4483 r12060  
    3636// Telematics.
    3737// [License]
     38
     39// XXX NOTE: Use this class with caution! Config support is outdated.
    3840
    3941#ifndef __STARTUP_WRAPPER_H
  • source/ariba/utility/system/SystemQueue.cpp

    r7533 r12060  
    6969}
    7070
     71// maps to function call internally to the Event-system
     72void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
     73{
     74    // copy function object
     75    boost::function0<void>* function_ptr = new boost::function0<void>();
     76    (*function_ptr) = function;
     77
     78    // schedule special call-event
     79    scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
     80
     81}
     82
    7183#ifdef UNDERLAY_OMNET
    7284void SystemQueue::handleMessage(cMessage* msg){
  • source/ariba/utility/system/SystemQueue.h

    r7468 r12060  
    6060#endif
    6161
     62#include <boost/function.hpp>
     63
     64
    6265using std::vector;
    6366using boost::posix_time::ptime;
     
    108111         */
    109112        void scheduleEvent( const SystemEvent& event, uint32_t delay = 0 );
     113       
     114        /**
     115         * This method schedules a function call in the SystemQueue.
     116         * (Like scheduleEvent, but to be used with boost::bind.)
     117         *
     118         * @param function: The function to be called [void function()]
     119         * @param The delay in milli-seconds
     120         */
     121        void scheduleCall( const boost::function0<void>& function, uint32_t delay = 0 );
    110122
    111123        /**
     
    170182#endif
    171183
     184       
     185       
    172186private:
    173187
     
    235249        volatile bool systemQueueRunning;
    236250#endif
    237 
     251       
     252       
     253private:
     254    /**
     255     * This inner class handles the function-call events.
     256     * @see SystemQueue::scheduleCall
     257     */
     258    class FunctionCaller  :  public SystemEventListener
     259    {
     260        void handleSystemEvent(const SystemEvent& event)
     261        {
     262            boost::function0<void>* function_ptr = event.getData< boost::function0<void> >();
     263            (*function_ptr)();
     264            delete function_ptr;
     265        }
     266    };
     267
     268    FunctionCaller internal_function_caller;
    238269}; // class SystemQueue
    239270
  • source/ariba/utility/transport

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/transport/CMakeLists.txt

    r10700 r12060  
    3838
    3939add_headers(
    40     test_transport.hpp
    41     transport_connection.hpp
    42     transport.hpp
    43     transport_listener.hpp
    4440    transport_peer.cpp
    4541    transport_peer.hpp
    46     transport_protocol.hpp
    4742    )
    4843
    49 add_subdir_sources(asio messages rfcomm tcpip)
     44add_subdir_sources(asio messages rfcomm StreamTransport)
     45
  • source/ariba/utility/transport/messages/message.cpp

    r10653 r12060  
    2424        os << "message({size=" << m.size() << ",buffers=" << (int) m.length()
    2525                        << ",hash=" << m.hash() << "},";
    26         m.foreach(ts);
     26        m.msg_foreach(ts);
    2727        os << ")";
    2828        return os;
  • source/ariba/utility/transport/messages/message.hpp

    r10653 r12060  
    1717
    1818/// message size type
    19 typedef signed char mlength_t;
     19//typedef signed char mlength_t;  // <--- don't do this!!
     20//typedef size_t mlength_t;
     21typedef int mlength_t;  // signed int seems necessary
    2022
    2123/// maximum number of buffers per message (default is 8)
    2224const mlength_t message_max_buffers = (1L << 3);
     25//const mlength_t message_max_buffers = (1L << 4);
    2326
    2427//! A Copy-on-Write Message with Shared Buffers.
     
    7073        /// Copy message
    7174        inline message_t(const message_t& msg) :
    72                 imsg(msg.imsg) {
    73                 imsg->owner = NULL;
     75                imsg(msg.imsg)
     76        {
     77            if ( imsg )
     78                imsg->owner = NULL;
    7479        }
    7580
     
    142147        /// Returns the number of buffers inside this message.
    143148        inline mlength_t length() const {
     149            if ( ! imsg )
     150                return 0;
     151           
    144152                return (imsg->length);
    145153        }
     
    167175        /// Iterates over a partial set of buffers.
    168176        template<typename T>
    169         inline void foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
     177        inline void msg_foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
    170178                T op = work;
    171179                if (size_ == 0) size_ = size() - index_;
     
    192200        inline void read(boctet_t* mem, size_t idx = 0, size_t size_ = 0) const {
    193201                struct read_buffer rb = { mem };
    194                 foreach(rb, idx, size_);
     202                msg_foreach(rb, idx, size_);
    195203        }
    196204
     
    198206        inline void write(const boctet_t* mem, size_t idx = 0, size_t size_ = 0) {
    199207                struct write_buffer wb = { mem };
    200                 foreach(wb, idx, size_);
     208                msg_foreach(wb, idx, size_);
    201209        }
    202210
     
    227235                message_t m;
    228236                struct sub_message sm = { &m };
    229                 foreach(sm, index, size);
     237                msg_foreach(sm, index, size);
    230238                return m;
    231239        }
  • source/ariba/utility/transport/messages/shared_buffer.hpp

    r10700 r12060  
    99
    1010#include <cstring>
     11#include <string>
    1112#include <boost/shared_ptr.hpp>
    1213
     
    1819#include "buffer.hpp"
    1920
     21#include <stdexcept>
     22
    2023namespace reboost {
     24
     25class illegal_sub_buffer: public std::runtime_error
     26{
     27public:
     28    /** Takes a character string describing the error.  */
     29    explicit illegal_sub_buffer(const std::string& __arg)  :
     30        std::runtime_error(__arg)
     31    {
     32    }
     33   
     34    virtual ~illegal_sub_buffer() throw() {}
     35};
    2136
    2237/**
     
    104119                parent(new deleteable_buffer(buffer, size))
    105120        {
     121        }
     122
     123//    /// XXX debug... copy!
     124//      /// create shared buffer from buffer
     125//      inline shared_buffer_t(const char* buffer, bsize_t size) :
     126//              buffer_t(), parent(new deleteable_buffer(size)) {
    106127//              memcpy(parent->mutable_data(), buffer, parent->size());
    107 //              data(parent->mutable_data());
    108 //              this->size(parent->size());
    109         }
     128//              data(parent->mutable_data()); this->size(parent->size());
     129//      }
    110130
    111131        /// clone data from a normal buffer
     
    129149
    130150        /// return sub-buffer.
    131         inline self operator()(bsize_t index, bsize_t size = 0) const {
     151        inline self operator()(bsize_t index, bsize_t size = 0) const
     152        {
     153            // special cases
     154            if ( index + size > size_ )
     155            {
     156                // empty sub-buffer
     157            if ( index == size_ )
     158            {
     159                self n;
     160                return n;
     161            }
     162         
     163            // ERROR: index out of bounds
     164            throw illegal_sub_buffer("Index or size out of bounds in shared_buffer");
     165            }
     166
     167            // regular case
    132168                self n(*this);
    133169                n.data_ += index;
  • source/ariba/utility/transport/rfcomm/CMakeLists.txt

    r10700 r12060  
    4040    bluetooth_endpoint.hpp
    4141    bluetooth_rfcomm.hpp
    42     rfcomm_transport.hpp
    4342    )
    4443
    45 add_sources(rfcomm_transport.cpp)
     44#add_sources()
     45
  • source/ariba/utility/transport/transport_peer.cpp

    r10700 r12060  
    1 
    2 #include "ariba/config.h"
    31#include "transport_peer.hpp"
    4 #include "transport.hpp"
    5 #include <boost/asio/ip/tcp.hpp>
     2
     3// ariba
     4#include "StreamTransport/StreamTransport.hpp"
     5#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
     6
     7// boost
    68#include <boost/asio/error.hpp>
    79#include <boost/foreach.hpp>
    8 
    9 #ifdef ECLIPSE_PARSER
    10     #define foreach(a, b) for(a : b)
    11 #else
    12     #define foreach(a, b) BOOST_FOREACH(a, b)
    13 #endif
    1410
    1511// namespace ariba::transport
     
    1713namespace transport {
    1814
    19 using namespace ariba::addressing;
     15using namespace addressing2;
    2016using boost::asio::ip::tcp;
    2117
     
    2622use_logging_cpp(transport_peer);
    2723
    28 transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) {
    29    
    30     // setup tcp transports
    31     foreach(tcp_port_address port, local.tcp) {
     24transport_peer::transport_peer()  :
     25        local(new addressing2::endpoint_set())
     26{
     27}
     28
     29EndpointSetPtr transport_peer::add_listenOn_endpoints(EndpointSetPtr endpoints)
     30{
     31    // TCP Endpoints
     32    BOOST_FOREACH( shared_ptr<tcpip_endpoint> endp, endpoints->get_tcpip_endpoints() )
     33    {
     34        // automatic port detection
     35        bool port_detection = false;
     36        uint16_t try_port = 41322;
    3237       
    33         if (local.ip.size() > 0) {
    34             foreach(ip_address ip_addr, local.ip) {
    35                
    36                 tcp::endpoint endp(ip_addr.asio(), port.asio());
    37                 create_service(endp);
    38             }
    39         } else {
    40             tcp::endpoint endp_v6(tcp::v6(), port.asio());
    41             tcp::endpoint endp_v4(tcp::v4(), port.asio());
    42            
    43             create_service(endp_v6);
    44             create_service(endp_v4);
     38        tcp::endpoint asio_endp = endp->to_asio();
     39        if ( asio_endp.port() == 0 )
     40        {
     41            port_detection = true;
    4542        }
    4643       
    47     }
    48    
     44       
     45        // create new server socket
     46        do
     47        {
     48            try
     49            {
     50                // automatic port detection
     51                if ( port_detection )
     52                {
     53                    asio_endp.port(try_port);
     54                    endp = tcpip_endpoint::create_TcpIP_Endpoint(asio_endp);
     55                }
     56               
     57                TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp->to_asio()));
     58                transport_streams.push_back(tmp_ptr);
     59                logging_info("Listening on IP/TCP " << endp->to_string());
     60               
     61                local->add_endpoint(endp);
     62                port_detection = false;
     63            }
     64           
     65            catch (boost::system::system_error& e)
     66            {
     67                // address in use
     68                if (e.code() == boost::asio::error::address_in_use)
     69                {
     70                    // BRANCH: automatic port detection
     71                    if ( port_detection )
     72                    {
     73                        // give up ?
     74                        if ( try_port > 41422 )
     75                        {
     76                            logging_warn("[WARN] Unable to find free port. Giving up. :-( Last try was: "
     77                                << endp->to_string() << ". Endpoint will be ignored!");
     78   
     79                            port_detection = false;
     80                        }
     81                        else
     82                        {
     83                            // try next port
     84                            try_port++;
     85                        }
     86                    }
     87                    // BRANCH: explicit given port --> error
     88                    else
     89                    {
     90                        logging_warn("[WARN] Address already in use: "
     91                            << endp->to_string() << ". Endpoint will be ignored!");
     92                    }
     93                }
     94   
     95                // Rethrow
     96                else
     97                {
     98                    throw;
     99                }
     100            }
     101        } while ( port_detection );
     102    }
     103   
     104    // TODO Bluetooth Endpoints
    49105        #ifdef HAVE_LIBBLUETOOTH
    50     foreach(rfcomm_channel_address channel, local.rfcomm) {
    51         if (local.bluetooth.size() > 0) {
    52                 foreach(mac_address mac, local.bluetooth) {
    53                         rfcomm::endpoint endp(mac.bluetooth(), channel.value());
    54                         create_service(endp);
    55                 }
    56         } else {
    57                 rfcomm::endpoint endp(channel.value());
    58                 create_service(endp);
    59         }
    60     }
     106//    foreach(rfcomm_channel_address channel, local.rfcomm) {
     107//      if (local.bluetooth.size() > 0) {
     108//              foreach(mac_address mac, local.bluetooth) {
     109//                      rfcomm::endpoint endp(mac.bluetooth(), channel.value());
     110//                      create_service(endp);
     111//              }
     112//      } else {
     113//              rfcomm::endpoint endp(channel.value());
     114//              create_service(endp);
     115//      }
     116//    }
    61117        #endif
    62 }
    63 
    64 void transport_peer::create_service(tcp::endpoint endp) {
    65     try {
    66         TcpIpPtr tmp_ptr(new tcpip(endp));
    67         tcps.push_back(tmp_ptr);
    68         logging_info("Listening on IP/TCP " << endp);
    69        
    70     } catch (boost::system::system_error& e) {
    71         if (e.code() == boost::asio::error::address_in_use) {
    72             logging_warn("[WARN] Address already in use: "
    73                     << endp << ". Endpoint will be ignored!");
    74         } else {
    75             // Rethrow
    76             throw;
    77         }
    78     }
    79 }
     118   
     119    return local;
     120}
     121
     122//void transport_peer::create_service(tcp::endpoint endp) {
     123//    try {
     124//        TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp));
     125//        tcps.push_back(tmp_ptr);
     126//        logging_info("Listening on IP/TCP " << endp);
     127//       
     128//    } catch (boost::system::system_error& e) {
     129//        if (e.code() == boost::asio::error::address_in_use) {
     130//            logging_warn("[WARN] Address already in use: "
     131//                    << endp << ". Endpoint will be ignored!");
     132//        } else {
     133//            // Rethrow
     134//            throw;
     135//        }
     136//    }
     137//}
    80138
    81139#ifdef HAVE_LIBBLUETOOTH
    82 void transport_peer::create_service(rfcomm::endpoint endp) {
    83     try {
    84         rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp));
    85         rfcomms.push_back(tmp_ptr);
    86         logging_info("Listening on bluetooth/RFCOMM " << endp);
    87        
    88     } catch (boost::system::system_error& e) {
    89         if (e.code() == boost::asio::error::address_in_use) {
    90             logging_warn("[WARN] Address already in use: "
    91                     << endp << ". Endpoint will be ignored!");
    92         } else {
    93             // Rethrow
    94             throw;
    95         }
    96     }
    97 }
     140//void transport_peer::create_service(rfcomm::endpoint endp) {
     141//    try {
     142//        TransportProtocolPtr tmp_ptr(new StreamTransport<rfcomm>(endp));
     143//        rfcomms.push_back(tmp_ptr);
     144//        logging_info("Listening on bluetooth/RFCOMM " << endp);
     145//       
     146//    } catch (boost::system::system_error& e) {
     147//        if (e.code() == boost::asio::error::address_in_use) {
     148//            logging_warn("[WARN] Address already in use: "
     149//                    << endp << ". Endpoint will be ignored!");
     150//        } else {
     151//            // Rethrow
     152//            throw;
     153//        }
     154//    }
     155//}
    98156#endif
    99157
     
    101159}
    102160
    103 void transport_peer::start() {
    104     foreach(TcpIpPtr tcp, tcps) {
    105         tcp->start();
    106     }
    107    
    108 #ifdef HAVE_LIBBLUETOOTH
    109     foreach(rfcomm_transport::sptr x, rfcomms) {
    110         x->start();
    111     }
    112 #endif
    113 }
    114 
    115 void transport_peer::stop() {
    116     foreach(TcpIpPtr tcp, tcps) {
    117         tcp->stop();
    118     }
    119    
    120 #ifdef HAVE_LIBBLUETOOTH
    121         foreach(rfcomm_transport::sptr x, rfcomms) {
    122                 x->stop();
    123         }
    124 #endif
     161void transport_peer::start()
     162{
     163    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     164    {
     165        stream->start();
     166    }
     167}
     168
     169void transport_peer::stop()
     170{
     171    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     172    {
     173        stream->stop();
     174    }
    125175}
    126176
    127177
    128178void transport_peer::send(
    129         const endpoint_set& endpoints,
     179        const const_EndpointSetPtr endpoints,
    130180        reboost::message_t message,
    131181        uint8_t priority)
    132182{
    133     foreach(TcpIpPtr tcp, tcps) {
    134         tcp->send(endpoints, message, priority);
    135     }
    136    
    137 #ifdef HAVE_LIBBLUETOOTH
    138     foreach(rfcomm_transport::sptr x, rfcomms) {
    139                 x->send(endpoints, message, priority);
    140         }
    141 #endif
    142 }
    143 
    144 void transport_peer::terminate( const address_v* remote ) {
    145         if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
    146         {
    147             foreach(TcpIpPtr tcp, tcps) {
    148                 tcp->terminate(remote);
    149             }
    150         }
    151 #ifdef HAVE_LIBBLUETOOTH
    152         if (remote->instanceof<rfcomm_endpoint>()) {
    153                 foreach(rfcomm_transport::sptr x, rfcomms) {
    154                         x->terminate(remote);
    155                 }
    156         }
    157 #endif
    158 }
    159 
    160 void transport_peer::register_listener( transport_listener* listener ) {
    161     foreach(TcpIpPtr tcp, tcps) {
    162         tcp->register_listener(listener);
    163     }
    164    
    165 #ifdef HAVE_LIBBLUETOOTH
    166     foreach(rfcomm_transport::sptr x, rfcomms) {
    167         x->register_listener(listener);
    168     }
    169 #endif
     183    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     184    {
     185        stream->send(endpoints, message, priority);
     186    }
     187}
     188
     189// XXX DEPRECATED
     190//void transport_peer::terminate( const address_v* remote ) {
     191//      if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
     192//      {
     193//          foreach(TransportProtocolPtr tcp, tcps) {
     194//              tcp->terminate(remote);
     195//          }
     196//      }
     197//#ifdef HAVE_LIBBLUETOOTH
     198//      if (remote->instanceof<rfcomm_endpoint>()) {
     199//              foreach(TransportProtocolPtr x, rfcomms) {
     200//                      x->terminate(remote);
     201//              }
     202//      }
     203//#endif
     204//}
     205
     206void transport_peer::register_listener( transport_listener* listener )
     207{
     208    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     209    {
     210        stream->register_listener(listener);
     211    }
    170212}
    171213
  • source/ariba/utility/transport/transport_peer.hpp

    r10700 r12060  
    22#define TRANSPORT_PEER_HPP_
    33
     4// ariba
    45#include "ariba/config.h"
    56#include "ariba/utility/logging/Logging.h"
    6 #include "transport_protocol.hpp"
    7 #include "ariba/utility/addressing/endpoint_set.hpp"
     7#include "ariba/utility/addressing2/endpoint_set.hpp"
     8
     9// ariba interfaces
     10#include "interfaces/transport_protocol.hpp"
     11
     12// boost
    813#include <boost/shared_ptr.hpp>
    9 #include "rfcomm/bluetooth_rfcomm.hpp"
     14
     15// boost-adaption
     16//#include "rfcomm/bluetooth_rfcomm.hpp"
    1017
    1118
     
    1421namespace transport {
    1522
    16 using namespace ariba::addressing;
    17 
    18 class tcpip;
    19 
    20 #ifdef HAVE_LIBBLUETOOTH
    21 class rfcomm_transport;
    22 #endif
    23 
    2423/**
    25  * TODO: Doc
     24 * This class allocates implementations of various transport
     25 * protocols and can send messages to an entire set of endpoints
    2626 *
    27  * @author Sebastian Mies <mies@tm.uka.de>
     27 * @author Sebastian Mies <mies@tm.uka.de>, Mario Hock
    2828 */
    29 /// this transport peer allocates implementations of various transport
    30 /// protocols and can send messages to an entire set of endpoints
    31 class transport_peer : public transport_protocol {
     29class transport_peer :
     30    public transport_protocol
     31{
    3232        use_logging_h(transport_peer);
     33        typedef boost::shared_ptr<transport_protocol> TransportProtocolPtr;
     34       
    3335public:
    34         transport_peer( endpoint_set& local_set );
     36        transport_peer();
     37       
     38        /**
     39         * Adds endpoints on which ariba should listen ("server"-sockets)
     40         *
     41         * @return An endpoint_set holding all active endpoints ariba is listening on.   
     42         */
     43        addressing2::EndpointSetPtr add_listenOn_endpoints(addressing2::EndpointSetPtr endpoints);
     44       
    3545        virtual ~transport_peer();
    3646        virtual void start();
     
    3848       
    3949        virtual void send(
    40                 const endpoint_set& endpoints,
     50                const addressing2::const_EndpointSetPtr endpoints,
    4151                reboost::message_t message,
    42                 uint8_t priority = 0);
    43        
    44         /// @deprecated: Use terminate() from transport_connection instead
    45         virtual void terminate( const address_v* remote );
    46        
     52                uint8_t priority = system_priority::OVERLAY);
     53               
    4754        virtual void register_listener( transport_listener* listener );
    4855
     56       
    4957private:
    50         void create_service(tcp::endpoint endp);
    51 #ifdef HAVE_LIBBLUETOOTH
    52         void create_service(boost::asio::bluetooth::rfcomm::endpoint endp);
    53 #endif
    54        
    55         endpoint_set&  local;
    56         std::vector< boost::shared_ptr<tcpip> > tcps;
    57 #ifdef HAVE_LIBBLUETOOTH
    58         std::vector< boost::shared_ptr<rfcomm_transport> > rfcomms;
    59 #endif
     58        addressing2::EndpointSetPtr local;
     59        std::vector<TransportProtocolPtr> transport_streams;
    6060};
    6161
  • source/ariba/utility/types/Identifier.h

    r4625 r12060  
    5050#include "ariba/utility/serialization.h"
    5151
     52// XXX EXPERIMENTAL
     53#include "ariba/utility/transport/messages/shared_buffer.hpp"
     54
    5255/**< maximum length of the key */
    5356#define MAX_KEYLENGTH 192
     
    6972        use_logging_h( Identifier );
    7073public:
     74       
     75        // XXX EXPERIMENTAL
     76        reboost::shared_buffer_t serialize() const
     77        {
     78            Data data = data_serialize(this, DEFAULT_V);
     79            reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     80           
     81            return buf;
     82        }
     83       
     84        reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff)
     85        {
     86            Data dat(const_cast<uint8_t*>(buff.data()), buff.size() * 8);
     87           
     88            size_t len = this->SERIALIZATION_METHOD_NAME(DESERIALIZE, dat) / 8;
     89
     90            // return remaining sub-buffer
     91            return buff(len);
     92        }
     93       
    7194
    7295        //-------------------------------------------------------------------------
  • source/ariba/utility/types/OverlayParameterSet.h

    r4625 r12060  
    5353
    5454        typedef enum _OverlayStructure {
    55                 OverlayStructureOneHop   = 0,
     55                OverlayStructureOneHop   = 0,   // DEPRECATED, DO NOT USE
    5656                OverlayStructureChord    = 1,
    5757        } OverlayStructure;
  • source/ariba/utility/types/ServiceID.cpp

    r3690 r12060  
    7676}
    7777
    78 string ServiceID::toString() const {
     78string ServiceID::toString() const
     79{
     80    if ( *this == ServiceID::UNSPECIFIED )
     81        return "UNSPEC";
     82   
    7983        return ariba::utility::Helper::ultos( id );
    8084}
  • source/services/ariba_dht/Dht.cpp

    r10700 r12060  
    4848void Dht::get(const std::string& key)
    4949{
    50         DhtMessage msg(DhtMessage::DhtGet, key);
     50        DhtMessage msg(DhtMessage::DhtGet, key, node->getNodeId());
    5151       
    5252        handle_dht_message(msg, NodeID::UNSPECIFIED);
     
    155155            case DhtMessage::DhtGet:
    156156            {
    157                 answer_dht_request(message.getKey(), source);
     157                answer_dht_request(message.getKey(), message.getSourceNode());
    158158
    159159                break;
     
    167167                                message.getValues(),
    168168                                message.getTTL());
    169                 answer_dht_request(message.getKey(), source);
     169                answer_dht_request(message.getKey(), message.getSourceNode());
    170170               
    171171                break;
  • source/services/ariba_dht/Dht.h

    r10700 r12060  
    2424using ariba::utility::SystemEventType;
    2525using ariba::utility::SystemEventListener;
     26
     27using ariba::utility::NodeID;
     28using ariba::utility::LinkID;
     29
    2630
    2731// Forward declarations to avoid adding messages/*.h to the public interface
  • source/services/ariba_dht/messages/DhtMessage.cpp

    r10700 r12060  
    1010DhtMessage::DhtMessage() :
    1111        ttl( 0 ),
    12         replace( false )
     12        replace( false ),
     13        sourceNode(NodeID::UNSPECIFIED)
    1314{}
    1415
    15 DhtMessage::DhtMessage( DhtMessageType type, const std::string& key ) :
     16DhtMessage::DhtMessage( DhtMessageType type, const std::string& key, const NodeID& sourceNodeID ) :
    1617        type( static_cast<uint8_t>(type) ),
    1718        ttl( 0 ),
    1819        replace( false ),
    19         key( key )
     20        key( key ),
     21        sourceNode(sourceNodeID)
     22
    2023{}
    2124
    2225DhtMessage::DhtMessage( DhtMessageType type, const std::string& key,
    23                 const std::string& value, uint16_t ttl ) :
     26                const std::string& value, uint16_t ttl, const NodeID& sourceNodeID ) :
    2427        type( static_cast<uint8_t>(type) ),
    2528        ttl( ttl ),
    2629        replace( false ),
    2730        key( key ),
    28         values(1, value)
     31        values(1, value),
     32        sourceNode(sourceNodeID)
    2933{}
    3034
    3135DhtMessage::DhtMessage( DhtMessageType type, const std::string& key,
    32                 const vector<string>& values, uint16_t ttl ) :
     36                const vector<string>& values, uint16_t ttl, const NodeID& sourceNodeID ) :
    3337        type( static_cast<uint8_t>(type) ),
    3438        ttl( ttl ),
    3539        replace( false ),
    36         key( key )
     40        key( key ),
     41        sourceNode(sourceNodeID)
    3742{
    3843        // preallocate enough room so we don't need to copy a lot
     
    4651}
    4752
     53string DhtMessage::DhtMessageTypeToString(DhtMessageType type) {
     54        string temp;
     55        switch (type)
     56        {
     57                case DhtMessage::DhtInvalid:
     58                {
     59                        temp = "DhtInvalid";
     60                        break;
     61                }
     62
     63                case DhtMessage::DhtGet:
     64                {
     65                        temp = "DhtGet";
     66                        break;
     67                }
     68
     69                case DhtMessage::DhtPut:
     70                {
     71                        temp = "DhtPut";
     72                        break;
     73                }
     74
     75                case DhtMessage::DhtPutAndGet:
     76                {
     77                        temp = "DhtPutAndGet";
     78                        break;
     79                }
     80
     81                case DhtMessage::DhtRemove:
     82                {
     83                        temp = "DhtRemove";
     84                        break;
     85                }
     86
     87                case DhtMessage::DhtRepublish:
     88                {
     89                        temp = "DhtRepublish";
     90                        break;
     91                }
     92
     93                case DhtMessage::DhtAnswer:
     94                {
     95                        temp =  "DhtAnswer";
     96                        break;
     97                }
     98
     99                case DhtMessage::DhtReplica:
     100                {
     101                        temp = "DhtReplica";
     102                        break;
     103                }
     104        }
     105
     106        return temp;
     107}
     108
    48109}}
  • source/services/ariba_dht/messages/DhtMessage.h

    r10700 r12060  
    33
    44#include "ariba/utility/messages.h"
     5#include "ariba/utility/types/NodeID.h"
    56#include "ariba/utility/serialization.h"
    67#include "ariba/Name.h"
     
    1011
    1112using ariba::utility::Message;
     13using ariba::utility::NodeID;
    1214using_serialization;
    1315
     
    2123                DhtRemove = 4,
    2224                DhtRepublish = 5,
    23                 DhtAnswer = 8
     25                DhtAnswer = 8,
     26                DhtReplica = 9
    2427        } DhtMessageType;
    2528       
    2629        DhtMessage();
    27         DhtMessage( DhtMessageType type, const std::string& key );
     30        DhtMessage( DhtMessageType type, const std::string& key, const NodeID& sourceNodeID = NodeID::UNSPECIFIED);
    2831        DhtMessage( DhtMessageType type, const std::string& key,
    29                         const std::string& value, uint16_t ttl = 0 );
     32                        const std::string& value, uint16_t ttl = 0, const NodeID& sourceNodeID = NodeID::UNSPECIFIED );
    3033       
    3134        DhtMessage( DhtMessageType type, const std::string& key,
    32                         const vector<std::string>& values, uint16_t ttl = 0 );
     35                        const vector<std::string>& values, uint16_t ttl = 0, const NodeID& sourceNodeID = NodeID::UNSPECIFIED );
    3336       
    3437        virtual ~DhtMessage();
     
    7780        }
    7881
     82        NodeID getSourceNode() const {
     83                return sourceNode;
     84        }
     85
    7986        bool doReplace() const {
    8087                return replace;
    8188        }
    8289
     90        static string DhtMessageTypeToString(DhtMessageType type);
    8391
    84 private:
     92protected:
    8593        uint8_t type;
    8694        uint16_t ttl;
     
    8896        std::string key;
    8997        vector<std::string> values;
     98        NodeID sourceNode;
     99
     100private:
    90101};
    91102
     
    100111        // key serialization
    101112        X && T(key);
     113
     114        X && &sourceNode;
    102115
    103116        // store number of values
Note: See TracChangeset for help on using the changeset viewer.