Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (11 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

Location:
source/ariba/utility/transport
Files:
9 added
8 deleted
8 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/transport

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/transport/CMakeLists.txt

    r10700 r12060  
    3838
    3939add_headers(
    40     test_transport.hpp
    41     transport_connection.hpp
    42     transport.hpp
    43     transport_listener.hpp
    4440    transport_peer.cpp
    4541    transport_peer.hpp
    46     transport_protocol.hpp
    4742    )
    4843
    49 add_subdir_sources(asio messages rfcomm tcpip)
     44add_subdir_sources(asio messages rfcomm StreamTransport)
     45
  • source/ariba/utility/transport/messages/message.cpp

    r10653 r12060  
    2424        os << "message({size=" << m.size() << ",buffers=" << (int) m.length()
    2525                        << ",hash=" << m.hash() << "},";
    26         m.foreach(ts);
     26        m.msg_foreach(ts);
    2727        os << ")";
    2828        return os;
  • source/ariba/utility/transport/messages/message.hpp

    r10653 r12060  
    1717
    1818/// message size type
    19 typedef signed char mlength_t;
     19//typedef signed char mlength_t;  // <--- don't do this!!
     20//typedef size_t mlength_t;
     21typedef int mlength_t;  // signed int seems necessary
    2022
    2123/// maximum number of buffers per message (default is 8)
    2224const mlength_t message_max_buffers = (1L << 3);
     25//const mlength_t message_max_buffers = (1L << 4);
    2326
    2427//! A Copy-on-Write Message with Shared Buffers.
     
    7073        /// Copy message
    7174        inline message_t(const message_t& msg) :
    72                 imsg(msg.imsg) {
    73                 imsg->owner = NULL;
     75                imsg(msg.imsg)
     76        {
     77            if ( imsg )
     78                imsg->owner = NULL;
    7479        }
    7580
     
    142147        /// Returns the number of buffers inside this message.
    143148        inline mlength_t length() const {
     149            if ( ! imsg )
     150                return 0;
     151           
    144152                return (imsg->length);
    145153        }
     
    167175        /// Iterates over a partial set of buffers.
    168176        template<typename T>
    169         inline void foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
     177        inline void msg_foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
    170178                T op = work;
    171179                if (size_ == 0) size_ = size() - index_;
     
    192200        inline void read(boctet_t* mem, size_t idx = 0, size_t size_ = 0) const {
    193201                struct read_buffer rb = { mem };
    194                 foreach(rb, idx, size_);
     202                msg_foreach(rb, idx, size_);
    195203        }
    196204
     
    198206        inline void write(const boctet_t* mem, size_t idx = 0, size_t size_ = 0) {
    199207                struct write_buffer wb = { mem };
    200                 foreach(wb, idx, size_);
     208                msg_foreach(wb, idx, size_);
    201209        }
    202210
     
    227235                message_t m;
    228236                struct sub_message sm = { &m };
    229                 foreach(sm, index, size);
     237                msg_foreach(sm, index, size);
    230238                return m;
    231239        }
  • source/ariba/utility/transport/messages/shared_buffer.hpp

    r10700 r12060  
    99
    1010#include <cstring>
     11#include <string>
    1112#include <boost/shared_ptr.hpp>
    1213
     
    1819#include "buffer.hpp"
    1920
     21#include <stdexcept>
     22
    2023namespace reboost {
     24
     25class illegal_sub_buffer: public std::runtime_error
     26{
     27public:
     28    /** Takes a character string describing the error.  */
     29    explicit illegal_sub_buffer(const std::string& __arg)  :
     30        std::runtime_error(__arg)
     31    {
     32    }
     33   
     34    virtual ~illegal_sub_buffer() throw() {}
     35};
    2136
    2237/**
     
    104119                parent(new deleteable_buffer(buffer, size))
    105120        {
     121        }
     122
     123//    /// XXX debug... copy!
     124//      /// create shared buffer from buffer
     125//      inline shared_buffer_t(const char* buffer, bsize_t size) :
     126//              buffer_t(), parent(new deleteable_buffer(size)) {
    106127//              memcpy(parent->mutable_data(), buffer, parent->size());
    107 //              data(parent->mutable_data());
    108 //              this->size(parent->size());
    109         }
     128//              data(parent->mutable_data()); this->size(parent->size());
     129//      }
    110130
    111131        /// clone data from a normal buffer
     
    129149
    130150        /// return sub-buffer.
    131         inline self operator()(bsize_t index, bsize_t size = 0) const {
     151        inline self operator()(bsize_t index, bsize_t size = 0) const
     152        {
     153            // special cases
     154            if ( index + size > size_ )
     155            {
     156                // empty sub-buffer
     157            if ( index == size_ )
     158            {
     159                self n;
     160                return n;
     161            }
     162         
     163            // ERROR: index out of bounds
     164            throw illegal_sub_buffer("Index or size out of bounds in shared_buffer");
     165            }
     166
     167            // regular case
    132168                self n(*this);
    133169                n.data_ += index;
  • source/ariba/utility/transport/rfcomm/CMakeLists.txt

    r10700 r12060  
    4040    bluetooth_endpoint.hpp
    4141    bluetooth_rfcomm.hpp
    42     rfcomm_transport.hpp
    4342    )
    4443
    45 add_sources(rfcomm_transport.cpp)
     44#add_sources()
     45
  • source/ariba/utility/transport/transport_peer.cpp

    r10700 r12060  
    1 
    2 #include "ariba/config.h"
    31#include "transport_peer.hpp"
    4 #include "transport.hpp"
    5 #include <boost/asio/ip/tcp.hpp>
     2
     3// ariba
     4#include "StreamTransport/StreamTransport.hpp"
     5#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
     6
     7// boost
    68#include <boost/asio/error.hpp>
    79#include <boost/foreach.hpp>
    8 
    9 #ifdef ECLIPSE_PARSER
    10     #define foreach(a, b) for(a : b)
    11 #else
    12     #define foreach(a, b) BOOST_FOREACH(a, b)
    13 #endif
    1410
    1511// namespace ariba::transport
     
    1713namespace transport {
    1814
    19 using namespace ariba::addressing;
     15using namespace addressing2;
    2016using boost::asio::ip::tcp;
    2117
     
    2622use_logging_cpp(transport_peer);
    2723
    28 transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) {
    29    
    30     // setup tcp transports
    31     foreach(tcp_port_address port, local.tcp) {
     24transport_peer::transport_peer()  :
     25        local(new addressing2::endpoint_set())
     26{
     27}
     28
     29EndpointSetPtr transport_peer::add_listenOn_endpoints(EndpointSetPtr endpoints)
     30{
     31    // TCP Endpoints
     32    BOOST_FOREACH( shared_ptr<tcpip_endpoint> endp, endpoints->get_tcpip_endpoints() )
     33    {
     34        // automatic port detection
     35        bool port_detection = false;
     36        uint16_t try_port = 41322;
    3237       
    33         if (local.ip.size() > 0) {
    34             foreach(ip_address ip_addr, local.ip) {
    35                
    36                 tcp::endpoint endp(ip_addr.asio(), port.asio());
    37                 create_service(endp);
    38             }
    39         } else {
    40             tcp::endpoint endp_v6(tcp::v6(), port.asio());
    41             tcp::endpoint endp_v4(tcp::v4(), port.asio());
    42            
    43             create_service(endp_v6);
    44             create_service(endp_v4);
     38        tcp::endpoint asio_endp = endp->to_asio();
     39        if ( asio_endp.port() == 0 )
     40        {
     41            port_detection = true;
    4542        }
    4643       
    47     }
    48    
     44       
     45        // create new server socket
     46        do
     47        {
     48            try
     49            {
     50                // automatic port detection
     51                if ( port_detection )
     52                {
     53                    asio_endp.port(try_port);
     54                    endp = tcpip_endpoint::create_TcpIP_Endpoint(asio_endp);
     55                }
     56               
     57                TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp->to_asio()));
     58                transport_streams.push_back(tmp_ptr);
     59                logging_info("Listening on IP/TCP " << endp->to_string());
     60               
     61                local->add_endpoint(endp);
     62                port_detection = false;
     63            }
     64           
     65            catch (boost::system::system_error& e)
     66            {
     67                // address in use
     68                if (e.code() == boost::asio::error::address_in_use)
     69                {
     70                    // BRANCH: automatic port detection
     71                    if ( port_detection )
     72                    {
     73                        // give up ?
     74                        if ( try_port > 41422 )
     75                        {
     76                            logging_warn("[WARN] Unable to find free port. Giving up. :-( Last try was: "
     77                                << endp->to_string() << ". Endpoint will be ignored!");
     78   
     79                            port_detection = false;
     80                        }
     81                        else
     82                        {
     83                            // try next port
     84                            try_port++;
     85                        }
     86                    }
     87                    // BRANCH: explicit given port --> error
     88                    else
     89                    {
     90                        logging_warn("[WARN] Address already in use: "
     91                            << endp->to_string() << ". Endpoint will be ignored!");
     92                    }
     93                }
     94   
     95                // Rethrow
     96                else
     97                {
     98                    throw;
     99                }
     100            }
     101        } while ( port_detection );
     102    }
     103   
     104    // TODO Bluetooth Endpoints
    49105        #ifdef HAVE_LIBBLUETOOTH
    50     foreach(rfcomm_channel_address channel, local.rfcomm) {
    51         if (local.bluetooth.size() > 0) {
    52                 foreach(mac_address mac, local.bluetooth) {
    53                         rfcomm::endpoint endp(mac.bluetooth(), channel.value());
    54                         create_service(endp);
    55                 }
    56         } else {
    57                 rfcomm::endpoint endp(channel.value());
    58                 create_service(endp);
    59         }
    60     }
     106//    foreach(rfcomm_channel_address channel, local.rfcomm) {
     107//      if (local.bluetooth.size() > 0) {
     108//              foreach(mac_address mac, local.bluetooth) {
     109//                      rfcomm::endpoint endp(mac.bluetooth(), channel.value());
     110//                      create_service(endp);
     111//              }
     112//      } else {
     113//              rfcomm::endpoint endp(channel.value());
     114//              create_service(endp);
     115//      }
     116//    }
    61117        #endif
    62 }
    63 
    64 void transport_peer::create_service(tcp::endpoint endp) {
    65     try {
    66         TcpIpPtr tmp_ptr(new tcpip(endp));
    67         tcps.push_back(tmp_ptr);
    68         logging_info("Listening on IP/TCP " << endp);
    69        
    70     } catch (boost::system::system_error& e) {
    71         if (e.code() == boost::asio::error::address_in_use) {
    72             logging_warn("[WARN] Address already in use: "
    73                     << endp << ". Endpoint will be ignored!");
    74         } else {
    75             // Rethrow
    76             throw;
    77         }
    78     }
    79 }
     118   
     119    return local;
     120}
     121
     122//void transport_peer::create_service(tcp::endpoint endp) {
     123//    try {
     124//        TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp));
     125//        tcps.push_back(tmp_ptr);
     126//        logging_info("Listening on IP/TCP " << endp);
     127//       
     128//    } catch (boost::system::system_error& e) {
     129//        if (e.code() == boost::asio::error::address_in_use) {
     130//            logging_warn("[WARN] Address already in use: "
     131//                    << endp << ". Endpoint will be ignored!");
     132//        } else {
     133//            // Rethrow
     134//            throw;
     135//        }
     136//    }
     137//}
    80138
    81139#ifdef HAVE_LIBBLUETOOTH
    82 void transport_peer::create_service(rfcomm::endpoint endp) {
    83     try {
    84         rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp));
    85         rfcomms.push_back(tmp_ptr);
    86         logging_info("Listening on bluetooth/RFCOMM " << endp);
    87        
    88     } catch (boost::system::system_error& e) {
    89         if (e.code() == boost::asio::error::address_in_use) {
    90             logging_warn("[WARN] Address already in use: "
    91                     << endp << ". Endpoint will be ignored!");
    92         } else {
    93             // Rethrow
    94             throw;
    95         }
    96     }
    97 }
     140//void transport_peer::create_service(rfcomm::endpoint endp) {
     141//    try {
     142//        TransportProtocolPtr tmp_ptr(new StreamTransport<rfcomm>(endp));
     143//        rfcomms.push_back(tmp_ptr);
     144//        logging_info("Listening on bluetooth/RFCOMM " << endp);
     145//       
     146//    } catch (boost::system::system_error& e) {
     147//        if (e.code() == boost::asio::error::address_in_use) {
     148//            logging_warn("[WARN] Address already in use: "
     149//                    << endp << ". Endpoint will be ignored!");
     150//        } else {
     151//            // Rethrow
     152//            throw;
     153//        }
     154//    }
     155//}
    98156#endif
    99157
     
    101159}
    102160
    103 void transport_peer::start() {
    104     foreach(TcpIpPtr tcp, tcps) {
    105         tcp->start();
    106     }
    107    
    108 #ifdef HAVE_LIBBLUETOOTH
    109     foreach(rfcomm_transport::sptr x, rfcomms) {
    110         x->start();
    111     }
    112 #endif
    113 }
    114 
    115 void transport_peer::stop() {
    116     foreach(TcpIpPtr tcp, tcps) {
    117         tcp->stop();
    118     }
    119    
    120 #ifdef HAVE_LIBBLUETOOTH
    121         foreach(rfcomm_transport::sptr x, rfcomms) {
    122                 x->stop();
    123         }
    124 #endif
     161void transport_peer::start()
     162{
     163    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     164    {
     165        stream->start();
     166    }
     167}
     168
     169void transport_peer::stop()
     170{
     171    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     172    {
     173        stream->stop();
     174    }
    125175}
    126176
    127177
    128178void transport_peer::send(
    129         const endpoint_set& endpoints,
     179        const const_EndpointSetPtr endpoints,
    130180        reboost::message_t message,
    131181        uint8_t priority)
    132182{
    133     foreach(TcpIpPtr tcp, tcps) {
    134         tcp->send(endpoints, message, priority);
    135     }
    136    
    137 #ifdef HAVE_LIBBLUETOOTH
    138     foreach(rfcomm_transport::sptr x, rfcomms) {
    139                 x->send(endpoints, message, priority);
    140         }
    141 #endif
    142 }
    143 
    144 void transport_peer::terminate( const address_v* remote ) {
    145         if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
    146         {
    147             foreach(TcpIpPtr tcp, tcps) {
    148                 tcp->terminate(remote);
    149             }
    150         }
    151 #ifdef HAVE_LIBBLUETOOTH
    152         if (remote->instanceof<rfcomm_endpoint>()) {
    153                 foreach(rfcomm_transport::sptr x, rfcomms) {
    154                         x->terminate(remote);
    155                 }
    156         }
    157 #endif
    158 }
    159 
    160 void transport_peer::register_listener( transport_listener* listener ) {
    161     foreach(TcpIpPtr tcp, tcps) {
    162         tcp->register_listener(listener);
    163     }
    164    
    165 #ifdef HAVE_LIBBLUETOOTH
    166     foreach(rfcomm_transport::sptr x, rfcomms) {
    167         x->register_listener(listener);
    168     }
    169 #endif
     183    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     184    {
     185        stream->send(endpoints, message, priority);
     186    }
     187}
     188
     189// XXX DEPRECATED
     190//void transport_peer::terminate( const address_v* remote ) {
     191//      if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
     192//      {
     193//          foreach(TransportProtocolPtr tcp, tcps) {
     194//              tcp->terminate(remote);
     195//          }
     196//      }
     197//#ifdef HAVE_LIBBLUETOOTH
     198//      if (remote->instanceof<rfcomm_endpoint>()) {
     199//              foreach(TransportProtocolPtr x, rfcomms) {
     200//                      x->terminate(remote);
     201//              }
     202//      }
     203//#endif
     204//}
     205
     206void transport_peer::register_listener( transport_listener* listener )
     207{
     208    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     209    {
     210        stream->register_listener(listener);
     211    }
    170212}
    171213
  • source/ariba/utility/transport/transport_peer.hpp

    r10700 r12060  
    22#define TRANSPORT_PEER_HPP_
    33
     4// ariba
    45#include "ariba/config.h"
    56#include "ariba/utility/logging/Logging.h"
    6 #include "transport_protocol.hpp"
    7 #include "ariba/utility/addressing/endpoint_set.hpp"
     7#include "ariba/utility/addressing2/endpoint_set.hpp"
     8
     9// ariba interfaces
     10#include "interfaces/transport_protocol.hpp"
     11
     12// boost
    813#include <boost/shared_ptr.hpp>
    9 #include "rfcomm/bluetooth_rfcomm.hpp"
     14
     15// boost-adaption
     16//#include "rfcomm/bluetooth_rfcomm.hpp"
    1017
    1118
     
    1421namespace transport {
    1522
    16 using namespace ariba::addressing;
    17 
    18 class tcpip;
    19 
    20 #ifdef HAVE_LIBBLUETOOTH
    21 class rfcomm_transport;
    22 #endif
    23 
    2423/**
    25  * TODO: Doc
     24 * This class allocates implementations of various transport
     25 * protocols and can send messages to an entire set of endpoints
    2626 *
    27  * @author Sebastian Mies <mies@tm.uka.de>
     27 * @author Sebastian Mies <mies@tm.uka.de>, Mario Hock
    2828 */
    29 /// this transport peer allocates implementations of various transport
    30 /// protocols and can send messages to an entire set of endpoints
    31 class transport_peer : public transport_protocol {
     29class transport_peer :
     30    public transport_protocol
     31{
    3232        use_logging_h(transport_peer);
     33        typedef boost::shared_ptr<transport_protocol> TransportProtocolPtr;
     34       
    3335public:
    34         transport_peer( endpoint_set& local_set );
     36        transport_peer();
     37       
     38        /**
     39         * Adds endpoints on which ariba should listen ("server"-sockets)
     40         *
     41         * @return An endpoint_set holding all active endpoints ariba is listening on.   
     42         */
     43        addressing2::EndpointSetPtr add_listenOn_endpoints(addressing2::EndpointSetPtr endpoints);
     44       
    3545        virtual ~transport_peer();
    3646        virtual void start();
     
    3848       
    3949        virtual void send(
    40                 const endpoint_set& endpoints,
     50                const addressing2::const_EndpointSetPtr endpoints,
    4151                reboost::message_t message,
    42                 uint8_t priority = 0);
    43        
    44         /// @deprecated: Use terminate() from transport_connection instead
    45         virtual void terminate( const address_v* remote );
    46        
     52                uint8_t priority = system_priority::OVERLAY);
     53               
    4754        virtual void register_listener( transport_listener* listener );
    4855
     56       
    4957private:
    50         void create_service(tcp::endpoint endp);
    51 #ifdef HAVE_LIBBLUETOOTH
    52         void create_service(boost::asio::bluetooth::rfcomm::endpoint endp);
    53 #endif
    54        
    55         endpoint_set&  local;
    56         std::vector< boost::shared_ptr<tcpip> > tcps;
    57 #ifdef HAVE_LIBBLUETOOTH
    58         std::vector< boost::shared_ptr<rfcomm_transport> > rfcomms;
    59 #endif
     58        addressing2::EndpointSetPtr local;
     59        std::vector<TransportProtocolPtr> transport_streams;
    6060};
    6161
Note: See TracChangeset for help on using the changeset viewer.