Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (11 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

Location:
source/ariba/utility/transport
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/transport

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/transport/transport_peer.cpp

    r10700 r12060  
    1 
    2 #include "ariba/config.h"
    31#include "transport_peer.hpp"
    4 #include "transport.hpp"
    5 #include <boost/asio/ip/tcp.hpp>
     2
     3// ariba
     4#include "StreamTransport/StreamTransport.hpp"
     5#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
     6
     7// boost
    68#include <boost/asio/error.hpp>
    79#include <boost/foreach.hpp>
    8 
    9 #ifdef ECLIPSE_PARSER
    10     #define foreach(a, b) for(a : b)
    11 #else
    12     #define foreach(a, b) BOOST_FOREACH(a, b)
    13 #endif
    1410
    1511// namespace ariba::transport
     
    1713namespace transport {
    1814
    19 using namespace ariba::addressing;
     15using namespace addressing2;
    2016using boost::asio::ip::tcp;
    2117
     
    2622use_logging_cpp(transport_peer);
    2723
    28 transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) {
    29    
    30     // setup tcp transports
    31     foreach(tcp_port_address port, local.tcp) {
     24transport_peer::transport_peer()  :
     25        local(new addressing2::endpoint_set())
     26{
     27}
     28
     29EndpointSetPtr transport_peer::add_listenOn_endpoints(EndpointSetPtr endpoints)
     30{
     31    // TCP Endpoints
     32    BOOST_FOREACH( shared_ptr<tcpip_endpoint> endp, endpoints->get_tcpip_endpoints() )
     33    {
     34        // automatic port detection
     35        bool port_detection = false;
     36        uint16_t try_port = 41322;
    3237       
    33         if (local.ip.size() > 0) {
    34             foreach(ip_address ip_addr, local.ip) {
    35                
    36                 tcp::endpoint endp(ip_addr.asio(), port.asio());
    37                 create_service(endp);
    38             }
    39         } else {
    40             tcp::endpoint endp_v6(tcp::v6(), port.asio());
    41             tcp::endpoint endp_v4(tcp::v4(), port.asio());
    42            
    43             create_service(endp_v6);
    44             create_service(endp_v4);
     38        tcp::endpoint asio_endp = endp->to_asio();
     39        if ( asio_endp.port() == 0 )
     40        {
     41            port_detection = true;
    4542        }
    4643       
    47     }
    48    
     44       
     45        // create new server socket
     46        do
     47        {
     48            try
     49            {
     50                // automatic port detection
     51                if ( port_detection )
     52                {
     53                    asio_endp.port(try_port);
     54                    endp = tcpip_endpoint::create_TcpIP_Endpoint(asio_endp);
     55                }
     56               
     57                TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp->to_asio()));
     58                transport_streams.push_back(tmp_ptr);
     59                logging_info("Listening on IP/TCP " << endp->to_string());
     60               
     61                local->add_endpoint(endp);
     62                port_detection = false;
     63            }
     64           
     65            catch (boost::system::system_error& e)
     66            {
     67                // address in use
     68                if (e.code() == boost::asio::error::address_in_use)
     69                {
     70                    // BRANCH: automatic port detection
     71                    if ( port_detection )
     72                    {
     73                        // give up ?
     74                        if ( try_port > 41422 )
     75                        {
     76                            logging_warn("[WARN] Unable to find free port. Giving up. :-( Last try was: "
     77                                << endp->to_string() << ". Endpoint will be ignored!");
     78   
     79                            port_detection = false;
     80                        }
     81                        else
     82                        {
     83                            // try next port
     84                            try_port++;
     85                        }
     86                    }
     87                    // BRANCH: explicit given port --> error
     88                    else
     89                    {
     90                        logging_warn("[WARN] Address already in use: "
     91                            << endp->to_string() << ". Endpoint will be ignored!");
     92                    }
     93                }
     94   
     95                // Rethrow
     96                else
     97                {
     98                    throw;
     99                }
     100            }
     101        } while ( port_detection );
     102    }
     103   
     104    // TODO Bluetooth Endpoints
    49105        #ifdef HAVE_LIBBLUETOOTH
    50     foreach(rfcomm_channel_address channel, local.rfcomm) {
    51         if (local.bluetooth.size() > 0) {
    52                 foreach(mac_address mac, local.bluetooth) {
    53                         rfcomm::endpoint endp(mac.bluetooth(), channel.value());
    54                         create_service(endp);
    55                 }
    56         } else {
    57                 rfcomm::endpoint endp(channel.value());
    58                 create_service(endp);
    59         }
    60     }
     106//    foreach(rfcomm_channel_address channel, local.rfcomm) {
     107//      if (local.bluetooth.size() > 0) {
     108//              foreach(mac_address mac, local.bluetooth) {
     109//                      rfcomm::endpoint endp(mac.bluetooth(), channel.value());
     110//                      create_service(endp);
     111//              }
     112//      } else {
     113//              rfcomm::endpoint endp(channel.value());
     114//              create_service(endp);
     115//      }
     116//    }
    61117        #endif
    62 }
    63 
    64 void transport_peer::create_service(tcp::endpoint endp) {
    65     try {
    66         TcpIpPtr tmp_ptr(new tcpip(endp));
    67         tcps.push_back(tmp_ptr);
    68         logging_info("Listening on IP/TCP " << endp);
    69        
    70     } catch (boost::system::system_error& e) {
    71         if (e.code() == boost::asio::error::address_in_use) {
    72             logging_warn("[WARN] Address already in use: "
    73                     << endp << ". Endpoint will be ignored!");
    74         } else {
    75             // Rethrow
    76             throw;
    77         }
    78     }
    79 }
     118   
     119    return local;
     120}
     121
     122//void transport_peer::create_service(tcp::endpoint endp) {
     123//    try {
     124//        TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp));
     125//        tcps.push_back(tmp_ptr);
     126//        logging_info("Listening on IP/TCP " << endp);
     127//       
     128//    } catch (boost::system::system_error& e) {
     129//        if (e.code() == boost::asio::error::address_in_use) {
     130//            logging_warn("[WARN] Address already in use: "
     131//                    << endp << ". Endpoint will be ignored!");
     132//        } else {
     133//            // Rethrow
     134//            throw;
     135//        }
     136//    }
     137//}
    80138
    81139#ifdef HAVE_LIBBLUETOOTH
    82 void transport_peer::create_service(rfcomm::endpoint endp) {
    83     try {
    84         rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp));
    85         rfcomms.push_back(tmp_ptr);
    86         logging_info("Listening on bluetooth/RFCOMM " << endp);
    87        
    88     } catch (boost::system::system_error& e) {
    89         if (e.code() == boost::asio::error::address_in_use) {
    90             logging_warn("[WARN] Address already in use: "
    91                     << endp << ". Endpoint will be ignored!");
    92         } else {
    93             // Rethrow
    94             throw;
    95         }
    96     }
    97 }
     140//void transport_peer::create_service(rfcomm::endpoint endp) {
     141//    try {
     142//        TransportProtocolPtr tmp_ptr(new StreamTransport<rfcomm>(endp));
     143//        rfcomms.push_back(tmp_ptr);
     144//        logging_info("Listening on bluetooth/RFCOMM " << endp);
     145//       
     146//    } catch (boost::system::system_error& e) {
     147//        if (e.code() == boost::asio::error::address_in_use) {
     148//            logging_warn("[WARN] Address already in use: "
     149//                    << endp << ". Endpoint will be ignored!");
     150//        } else {
     151//            // Rethrow
     152//            throw;
     153//        }
     154//    }
     155//}
    98156#endif
    99157
     
    101159}
    102160
    103 void transport_peer::start() {
    104     foreach(TcpIpPtr tcp, tcps) {
    105         tcp->start();
    106     }
    107    
    108 #ifdef HAVE_LIBBLUETOOTH
    109     foreach(rfcomm_transport::sptr x, rfcomms) {
    110         x->start();
    111     }
    112 #endif
    113 }
    114 
    115 void transport_peer::stop() {
    116     foreach(TcpIpPtr tcp, tcps) {
    117         tcp->stop();
    118     }
    119    
    120 #ifdef HAVE_LIBBLUETOOTH
    121         foreach(rfcomm_transport::sptr x, rfcomms) {
    122                 x->stop();
    123         }
    124 #endif
     161void transport_peer::start()
     162{
     163    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     164    {
     165        stream->start();
     166    }
     167}
     168
     169void transport_peer::stop()
     170{
     171    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     172    {
     173        stream->stop();
     174    }
    125175}
    126176
    127177
    128178void transport_peer::send(
    129         const endpoint_set& endpoints,
     179        const const_EndpointSetPtr endpoints,
    130180        reboost::message_t message,
    131181        uint8_t priority)
    132182{
    133     foreach(TcpIpPtr tcp, tcps) {
    134         tcp->send(endpoints, message, priority);
    135     }
    136    
    137 #ifdef HAVE_LIBBLUETOOTH
    138     foreach(rfcomm_transport::sptr x, rfcomms) {
    139                 x->send(endpoints, message, priority);
    140         }
    141 #endif
    142 }
    143 
    144 void transport_peer::terminate( const address_v* remote ) {
    145         if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
    146         {
    147             foreach(TcpIpPtr tcp, tcps) {
    148                 tcp->terminate(remote);
    149             }
    150         }
    151 #ifdef HAVE_LIBBLUETOOTH
    152         if (remote->instanceof<rfcomm_endpoint>()) {
    153                 foreach(rfcomm_transport::sptr x, rfcomms) {
    154                         x->terminate(remote);
    155                 }
    156         }
    157 #endif
    158 }
    159 
    160 void transport_peer::register_listener( transport_listener* listener ) {
    161     foreach(TcpIpPtr tcp, tcps) {
    162         tcp->register_listener(listener);
    163     }
    164    
    165 #ifdef HAVE_LIBBLUETOOTH
    166     foreach(rfcomm_transport::sptr x, rfcomms) {
    167         x->register_listener(listener);
    168     }
    169 #endif
     183    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     184    {
     185        stream->send(endpoints, message, priority);
     186    }
     187}
     188
     189// XXX DEPRECATED
     190//void transport_peer::terminate( const address_v* remote ) {
     191//      if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
     192//      {
     193//          foreach(TransportProtocolPtr tcp, tcps) {
     194//              tcp->terminate(remote);
     195//          }
     196//      }
     197//#ifdef HAVE_LIBBLUETOOTH
     198//      if (remote->instanceof<rfcomm_endpoint>()) {
     199//              foreach(TransportProtocolPtr x, rfcomms) {
     200//                      x->terminate(remote);
     201//              }
     202//      }
     203//#endif
     204//}
     205
     206void transport_peer::register_listener( transport_listener* listener )
     207{
     208    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     209    {
     210        stream->register_listener(listener);
     211    }
    170212}
    171213
Note: See TracChangeset for help on using the changeset viewer.