Ignore:
Timestamp:
Jul 25, 2012, 11:41:36 AM (12 years ago)
Author:
Michael Tänzer
Message:

Merge the ASIO branch back into trunk

Location:
source/ariba/utility/transport
Files:
17 added
7 deleted
8 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/transport

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/transport/tcpip/tcpip.cpp

    r10075 r10653  
    11#include "tcpip.hpp"
    22
    3 #define _NO_LOGGING
    4 
    5 // std includes
    6 #include <unistd.h>
    7 #include <iostream>
    8 #include <string>
    9 #include <sstream>
    10 #include <boost/foreach.hpp>
    11 
    12 // protlib includes
    13 #include "protlib/network_message.h"
    14 #include "protlib/tp_over_tcp.h"
    15 #include "protlib/tperror.h"
    16 #include "protlib/logfile.h"
    17 #include "protlib/queuemanager.h"
    18 #include "protlib/threadsafe_db.h"
    19 #include "protlib/setuid.h"
    20 
    21 // protlib namespaces
    22 using namespace protlib;
    23 using namespace protlib::log;
    24 
    25 logfile commonlog;
    26 protlib::log::logfile& protlib::log::DefaultLog(commonlog);
     3#include <boost/array.hpp>
     4
     5// interface discovery for link-local destinations
     6#include <ifaddrs.h>
    277
    288namespace ariba {
    299namespace transport {
    3010
     11use_logging_cpp(tcpip)
     12
    3113using namespace ariba::addressing;
    3214
    33 
    34 tcpip_endpoint convert( const appladdress* addr ) {
    35         const char* ip_str = addr->get_ip_str();
    36         tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() );
    37         return endpoint;
    38 }
    39 
    40 appladdress convert( const tcpip_endpoint& endpoint ) {
    41         tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint);
    42         appladdress
    43                 peer(e->address().to_string().c_str(), "tcp", e->port().asio() );
    44 //      cout << endpoint.to_string() << " to " << peer.get_ip_str() << ":" << peer.get_port() << endl;
    45         return peer;
    46 }
    47 
    48 tcpip::tcpip( uint16_t port ) :
    49         done ( false ),
    50         running ( false ),
    51         port( port ),
    52         tpreceivethread ( NULL ),
    53         tpthread ( NULL ),
    54         listener ( NULL ) {
    55 }
    56 
    57 tcpip::~tcpip() {
    58         if (running) stop();
    59 }
    60 
    61 bool get_message_length( NetMsg& m, uint32& clen_bytes ) {
    62         clen_bytes = m.decode32();
    63         m.set_pos_r(-4);
    64         return true;
    65 }
    66 
    67 void tcpip::start() {
    68         done = false;
    69         running = false;
    70 
    71         // initalize netdb and setuid
    72         protlib::tsdb::init();
    73         protlib::setuid::init();
    74 
    75         // set tcp parameters
    76         port_t port = this->port; // port
    77         TPoverTCPParam tppar(4, get_message_length, port);
    78 
    79         // create receiver thread
    80         FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
    81         QueueManager::instance()->register_queue(tpchecker_fq,
    82                         message::qaddr_signaling);
    83 
    84         // start thread
    85         pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this );
    86         tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
    87         tpthread->start_processing();
    88 }
    89 
    90 void tcpip::stop() {
    91         // stop receiver thread
    92         done = true;
    93 
    94         // stop TPoverTCP
    95         tpthread->stop_processing();
    96         tpthread->abort_processing(true);
    97         tpthread->wait_until_stopped();
    98 
    99         // unregister TPoverTCP
    100         delete QueueManager::instance()->get_queue( message::qaddr_signaling );
    101         QueueManager::instance()->unregister_queue( message::qaddr_signaling );
    102 
    103         // destroy QueueManager
    104         QueueManager::clear();
    105 
    106         // de-initalize netdb and setuid
    107         protlib::setuid::end();
    108         protlib::tsdb::end();
    109 
    110         // wait for thread to finish and delete
    111         pthread_join(tpreceivethread, NULL);
    112 }
    113 
    114 void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) {
    115 
    116         // prepare netmsg with length and and port
    117         NetMsg* datamsg = new NetMsg(size + 6);
    118         datamsg->encode32( size + 2,  true );
    119         datamsg->encode16( this->port,true );
    120 
    121         for (size_t i=0; i<size; i++)
    122                 datamsg->encode8( data[i],true );
    123 
    124         // send message
    125         tcpip_endpoint endpoint = *remote;
    126         appladdress peer = convert(endpoint);
    127 
    128         // add to output queue
    129         tpthread->get_thread_object()->send( datamsg, peer, false );
    130 }
    131 
    132 void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
    133         // send a message to each combination of ip-address and port
    134         BOOST_FOREACH( const ip_address ip, endpoints.ip ) {
    135                 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
    136                         tcpip_endpoint endpoint(ip,port);
    137                         address_vf vf = endpoint;
    138                         send(vf,data,size);
    139                 }
    140         }
    141 }
    142 
    143 void tcpip::terminate( const address_v* remote) {
    144         tcpip_endpoint endpoint = *remote;
    145         appladdress peer = convert(endpoint);
    146         peer.convert_to_ipv6();
    147         tpthread->get_thread_object()->terminate( peer );
    148 }
    149 
    150 void tcpip::register_listener( transport_listener* listener ) {
    151         this->listener = listener;
    152 }
    153 
    154 void* tcpip::receiverThread( void* ptp ) {
    155         // get reference to transport object
    156         tcpip& tp = *((tcpip*)ptp);
    157 
    158         // get queue
    159         FastQueue* fq =
    160                 QueueManager::instance()->get_queue(message::qaddr_signaling);
    161 
    162         // main processing loop
    163         tp.running = true;
    164         while (!tp.done) {
    165 
    166                 // wait for new message to approach
    167                 message* msg = fq->dequeue_timedwait(300);
    168 
    169                 // message has arrived? no-> continue
    170                 if (!msg) continue;
    171 
    172                 // handle transport message
    173                 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
    174                 if (!tpmsg) {
    175                         delete msg;
    176                         continue;
    177                 }
    178 
    179                 // get address & message
    180                 const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
    181                 const appladdress* local_peer  = static_cast<const appladdress*>( tpmsg->get_ownaddress() );
    182                 NetMsg* datamsg = tpmsg->get_message();
    183 
    184                 // not a data message? -> continue!
    185                 if (!datamsg) {
    186                         delete tpmsg;
    187                         continue;
    188                 }
    189 
    190                 // get length and remote endpoint port
    191                 datamsg->set_pos(0);
    192                 uint32_t message_size = datamsg->decode32(true)-2;
    193                 //uint16_t remote_port = datamsg->decode16(true);
    194 
    195                 // inform listener
    196                 if (tp.listener != NULL) {
    197                         tcpip_endpoint remote = convert(remote_peer);
    198                         tcpip_endpoint local  = convert(local_peer);
    199                         tp.listener->receive_message(
    200                                         &tp, local, remote, datamsg->get_buffer()+6, message_size );
    201                 }
    202 
    203                 tpmsg->set_message(NULL);
    204                 delete datamsg;
    205                 delete tpmsg;
    206         }
    207         // clean queue & stop
    208         fq->cleanup();
    209         tp.running = false;
    210         return NULL;
     15typedef boost::mutex::scoped_lock unique_lock;
     16
     17tcpip::tcpip( const tcp::endpoint& endp )  :
     18        listener(NULL),
     19        acceptor(u_io_service.get_asio_io_service(), endp)
     20{
     21}
     22
     23tcpip::~tcpip(){}
     24
     25void tcpip::start()
     26{
     27    // open server socket
     28    accept();
     29   
     30    u_io_service.start();
     31}
     32
     33
     34void tcpip::stop()
     35{
     36    acceptor.close();
     37   
     38    u_io_service.stop();
     39}
     40
     41
     42/* see header file for comments */
     43void tcpip::send(
     44        const tcp::endpoint& dest_addr,
     45        reboost::message_t message,
     46        uint8_t priority)
     47{
     48    ConnPtr conn;
     49    bool need_to_connect = false;
     50   
     51    {
     52        unique_lock lock(connections_lock);
     53       
     54        ConnectionMap::iterator it = connections.find(dest_addr);
     55        if (it == connections.end())
     56        {
     57            ConnPtr tmp_ptr(
     58                    new tcpip_connection(
     59                            u_io_service.get_asio_io_service(),
     60                            shared_from_this() )
     61                    );
     62            conn = tmp_ptr;
     63           
     64            conn->partner = dest_addr;
     65            conn->remote = convert_address(dest_addr);
     66           
     67            // Note: starting the send is the obligation of the connect_handler
     68            // (avoids trying to send while not connected yet)
     69            conn->sending =  true;
     70            need_to_connect = true;
     71           
     72            ConnectionMap::value_type item(dest_addr, conn);
     73            connections.insert(item);
     74           
     75        } else {
     76            conn = it->second;
     77        }
     78    }
     79   
     80   
     81    // * the actual send *
     82    conn->enqueue_for_sending(message, priority);
     83   
     84    // if new connection connect to the other party
     85    if ( need_to_connect )
     86    {
     87        conn->sock.async_connect(
     88                dest_addr,
     89                boost::bind(
     90                        &tcpip_connection::async_connect_handler,
     91                        conn,
     92                        boost::asio::placeholders::error));
     93    }
     94}
     95
     96
     97/* see header file for comments */
     98void tcpip::send(
     99        const address_v* remote,
     100        reboost::message_t message,
     101        uint8_t priority)
     102{
     103    send(convert_address(remote), message, priority);
     104}
     105
     106
     107/* see header file for comments */
     108void tcpip::send(
     109        const endpoint_set& endpoints,
     110        reboost::message_t message,
     111        uint8_t priority )
     112{
     113    // network interfaces scope_ids, for link-local connections (lazy initialization)
     114    vector<uint64_t> scope_ids;
     115   
     116    // send a message to each combination of address-address and port
     117    BOOST_FOREACH( const ip_address address, endpoints.ip ) {
     118        BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
     119            tcp::endpoint endp(address.asio(), port.asio());
     120           
     121            // special treatment for link local addresses
     122            //   ---> send over all (suitable) interfaces
     123            if ( endp.address().is_v6() )
     124            {
     125                boost::asio::ip::address_v6 v6_addr = endp.address().to_v6();
     126               
     127                if ( v6_addr.is_link_local() )
     128                {
     129                    // initialize scope_ids
     130                    if ( scope_ids.size() == 0 )
     131                        scope_ids = get_interface_scope_ids();
     132                   
     133                    BOOST_FOREACH ( uint64_t id, scope_ids )
     134                    {                       
     135                        v6_addr.scope_id(id);
     136                        endp.address(v6_addr);
     137   
     138                        logging_debug("------> SEND TO (link-local): " << endp);
     139                        // * send *
     140                        send(endp, message, priority);
     141                    }
     142                }
     143               
     144                continue;
     145            }
     146           
     147            // * send *
     148            send(endp, message, priority);
     149        }
     150    }
     151}
     152
     153
     154void tcpip::register_listener( transport_listener* listener )
     155{
     156    this->listener = listener;
     157}
     158
     159
     160void tcpip::terminate( const address_v* remote )
     161{
     162    terminate(convert_address(remote));
     163}
     164
     165void tcpip::terminate( const tcp::endpoint& remote )
     166{
     167    ConnPtr conn;
     168   
     169    // find and forget connection
     170    {
     171        unique_lock lock(connections_lock);
     172       
     173        ConnectionMap::iterator it = connections.find(remote);
     174        if (it == connections.end())
     175        {
     176            return;
     177        }
     178       
     179        conn = it->second;
     180       
     181        connections.erase(it);
     182    }
     183
     184    // close connection
     185    boost::system::error_code ec;
     186    conn->sock.shutdown(tcp::socket::shutdown_both, ec);
     187    conn->sock.close(ec);
     188}
     189
     190
     191/* private */
     192void tcpip::accept()
     193{
     194    // create new connection object
     195    ConnPtr conn(
     196            new tcpip_connection(
     197                    u_io_service.get_asio_io_service(),
     198                    shared_from_this()
     199            )
     200    );
     201   
     202    // wait for incoming connection
     203    acceptor.async_accept(
     204            conn->sock,
     205            boost::bind(&self::async_accept_handler,
     206                    this->shared_from_this(),
     207                    conn,
     208                    boost::asio::placeholders::error)
     209    );
     210}
     211
     212void tcpip::async_accept_handler(ConnPtr conn, const error_code& error)
     213{
     214    if ( ! error )
     215    {
     216        conn->partner = conn->sock.remote_endpoint();
     217        conn->remote = convert_address(conn->partner);
     218        conn->local = convert_address(conn->sock.local_endpoint());
     219       
     220        {
     221            unique_lock lock(connections_lock);
     222           
     223            ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
     224            connections.insert(item);
     225        }
     226       
     227        // read
     228        conn->listen();
     229    }
     230   
     231    // accept further connections
     232    accept();
     233}
     234
     235inline tcp::endpoint tcpip::convert_address( const address_v* address )
     236{
     237    tcpip_endpoint endpoint = *address;
     238   
     239    return tcp::endpoint(
     240        endpoint.address().asio(), endpoint.port().value()
     241    );
     242}
     243
     244
     245inline tcpip_endpoint tcpip::convert_address(const tcp::endpoint& endpoint)
     246{
     247    ip_address address;
     248    address.asio(endpoint.address());
     249    tcp_port_address port;
     250    port.value(endpoint.port());
     251    return tcpip_endpoint(address, port);
     252}
     253
     254
     255vector<uint64_t> tcpip::get_interface_scope_ids()
     256{
     257    vector<uint64_t> ret;
     258   
     259    struct ifaddrs* ifaceBuffer = NULL;
     260    void*           tmpAddrPtr  = NULL;
     261   
     262    int ok = getifaddrs( &ifaceBuffer );
     263    if( ok != 0 ) return ret;
     264
     265    for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
     266
     267        // ignore devices that are disabled or have no ip
     268        if(i == NULL) continue;
     269        struct sockaddr* addr = i->ifa_addr;
     270        if (addr==NULL) continue;
     271
     272        // only use ethX and wlanX devices
     273        string device = string(i->ifa_name);
     274        if ( (device.find("eth") == string::npos) &&
     275              (device.find("wlan")  == string::npos) /* &&
     276              (device.find("lo")  == string::npos) XXX */ )
     277        {
     278            continue;
     279        }
     280
     281        // only use interfaces with ipv6 link-local addresses
     282        if (addr->sa_family == AF_INET6)
     283        {
     284            // convert address
     285            // TODO should be possible without detour over strings
     286            char straddr[INET6_ADDRSTRLEN];
     287            tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
     288            inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
     289
     290            address_v6 v6addr = address_v6::from_string(straddr);
     291            if ( v6addr.is_link_local() )
     292            {
     293                // * append the scope_id to the return vector *
     294                ret.push_back(if_nametoindex(i->ifa_name));
     295            }
     296
     297        }
     298    }
     299
     300    freeifaddrs(ifaceBuffer);
     301   
     302    return ret;
     303}
     304
     305
     306/*****************
     307 ** inner class **
     308 *****************/
     309
     310tcpip::tcpip_connection::tcpip_connection(boost::asio::io_service & io_service, TcpIpPtr parent)  :
     311        sock(io_service),
     312        valid(true),
     313        parent(parent),
     314        out_queues(8), //TODO How much priorities shall we have?
     315        sending(false)
     316{
     317        header.length = 0;
     318        header.prot = 0;
     319}
     320
     321/*-------------------------------------------
     322 | implement transport_connection interface |
     323 -------------------------------------------*/
     324void tcpip::tcpip_connection::send(
     325        reboost::message_t message,
     326        uint8_t priority)
     327{
     328    enqueue_for_sending(message, priority);
     329}
     330
     331
     332address_vf tcpip::tcpip_connection::getLocalEndpoint()
     333{
     334    return local;
     335}
     336
     337
     338address_vf tcpip::tcpip_connection::getRemoteEndpoint()
     339{
     340    return remote;
     341}
     342
     343
     344void tcpip::tcpip_connection::terminate()
     345{
     346    parent->terminate(partner);
     347}
     348
     349
     350/*------------------------------
     351 | things we defined ourselves |
     352 ------------------------------*/
     353void tcpip::tcpip_connection::async_connect_handler(const error_code& error)
     354{
     355    if (error)
     356    {
     357        parent->terminate(partner);
     358
     359        return;
     360    }
     361   
     362    // save address in ariba format
     363    local = parent->convert_address(sock.local_endpoint());
     364   
     365    // Note: sending has to be true at this point
     366    send_next_package();
     367   
     368    listen();
     369}
     370
     371
     372void tcpip::tcpip_connection::listen()
     373{
     374    boost::asio::async_read(
     375            this->sock,
     376            boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
     377            boost::bind(
     378                    &tcpip::tcpip_connection::async_read_header_handler,
     379                    this->shared_from_this(),
     380                    boost::asio::placeholders::error,
     381                    boost::asio::placeholders::bytes_transferred
     382            )
     383    );
     384}
     385
     386
     387void tcpip::tcpip_connection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
     388{
     389    if (error)
     390    {
     391        parent->terminate(partner);
     392
     393        return;
     394    }
     395
     396    // convert byte order
     397    header.length = ntohl(header.length);
     398    header.length -= 2;  // XXX protlib
     399   
     400    assert(header.length > 0);
     401   
     402    // new buffer for the new packet
     403    buffy = shared_buffer_t(header.length);
     404
     405    // * read data *
     406    boost::asio::async_read(
     407            this->sock,
     408            boost::asio::buffer(buffy.mutable_data(), buffy.size()),
     409            boost::bind(
     410                    &tcpip::tcpip_connection::async_read_data_handler,
     411                    this->shared_from_this(),
     412                    boost::asio::placeholders::error,
     413                    boost::asio::placeholders::bytes_transferred
     414            )
     415    );
     416}
     417
     418void tcpip::tcpip_connection::async_read_data_handler(
     419        const error_code& error, size_t bytes_transferred)
     420{
     421    if (error)
     422    {
     423        parent->terminate(partner);
     424
     425        return;
     426    }
     427   
     428    message_t msg;
     429    msg.push_back(buffy);
     430    buffy = shared_buffer_t();
     431
     432    if ( parent->listener )
     433        parent->listener->receive_message(shared_from_this(), msg);
     434   
     435    listen();
     436}
     437
     438/* see header file for comments */
     439void tcpip::tcpip_connection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
     440{
     441    if ( error )
     442    {       
     443        // remove this connection
     444        parent->terminate(partner);
     445
     446        return;
     447    }
     448   
     449    send_next_package();
     450}
     451
     452
     453
     454void tcpip::tcpip_connection::enqueue_for_sending(Packet packet, uint8_t priority)
     455{
     456    bool restart_sending = false;
     457   
     458    // enqueue packet  [locked]
     459    {
     460        unique_lock(out_queues_lock);
     461       
     462        assert( priority < out_queues.size() );
     463        out_queues[priority].push(packet);
     464       
     465        if ( ! sending )
     466        {
     467            restart_sending = true;
     468            sending = true;
     469        }
     470    }
     471   
     472    // if sending was stopped, we have to restart it here
     473    if ( restart_sending )
     474    {
     475        send_next_package();
     476    }
     477}
     478
     479/* see header file for comments */
     480void tcpip::tcpip_connection::send_next_package()
     481{
     482    Packet packet;
     483    bool found = false;
     484
     485    // find packet with highest priority  [locked]
     486    {
     487        unique_lock(out_queues_lock);
     488       
     489        for ( vector<OutQueue>::iterator it = out_queues.begin();
     490                it != out_queues.end(); it++ )
     491        {
     492            if ( !it->empty() )
     493            {
     494                packet = it->front();
     495                it->pop();
     496                found = true;
     497               
     498                break;
     499            }
     500        }
     501       
     502        // no packets waiting --> stop sending
     503        if ( ! found )
     504        {
     505            sending = false;
     506        }
     507    }
     508   
     509    // * send *
     510    if ( found )
     511    {
     512        reboost::shared_buffer_t header_buf(sizeof(header_t));
     513        header_t* header = (header_t*)(header_buf.mutable_data());
     514        header->length = htonl(packet.size()+2);  // XXX protlib
     515       
     516        packet.push_front(header_buf);
     517       
     518        // "convert" message to asio buffer sequence
     519        vector<boost::asio::const_buffer> send_sequence(packet.length());
     520        for ( int i=0; i < packet.length(); i++ )
     521        {
     522            shared_buffer_t b = packet.at(i);
     523            send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
     524        }
     525       
     526        // * async write *
     527        boost::asio::async_write(
     528                this->sock,
     529                send_sequence,
     530                boost::bind(
     531                        &tcpip::tcpip_connection::async_write_handler,
     532                        this->shared_from_this(),
     533                        packet,  // makes sure our shared pointer lives long enough ;-)
     534                        boost::asio::placeholders::error,
     535                        boost::asio::placeholders::bytes_transferred)
     536        );
     537    }
    211538}
    212539
  • source/ariba/utility/transport/tcpip/tcpip.hpp

    r5993 r10653  
    33
    44#include "ariba/utility/transport/transport.hpp"
    5 #include <pthread.h>
    6 
    7 // forward declaration
    8 namespace protlib {
    9 template<class X, class Y>
    10 class ThreadStarter;
    11 class TPoverTCP;
    12 class TPoverTCPParam;
    13 }
     5#include "ariba/utility/transport/asio/unique_io_service.h"
     6#include "ariba/utility/transport/transport_connection.hpp"
     7#include "ariba/utility/addressing/tcpip_endpoint.hpp"
     8#include <boost/asio.hpp>
     9#include <boost/shared_ptr.hpp>
     10#include <boost/enable_shared_from_this.hpp>
     11#include <queue>
     12#include "ariba/utility/transport/messages/buffers.hpp"
     13#include "ariba/utility/logging/Logging.h"
    1414
    1515namespace ariba {
    1616namespace transport {
    1717
    18 using namespace protlib;
     18using namespace std;
     19using ariba::transport::detail::unique_io_service;
     20using ariba::addressing::tcpip_endpoint;
     21using boost::asio::ip::tcp;
     22using boost::asio::ip::address_v6;
     23using boost::system::error_code;
     24using reboost::shared_buffer_t;
     25using reboost::message_t;
    1926
    20 /**
    21  * TODO: Doc
    22  *
    23  * @author Sebastian Mies <mies@tm.uka.de>
    24  */
    25 class tcpip : public transport_protocol {
     27class tcpip;
     28typedef boost::shared_ptr<tcpip> TcpIpPtr;
     29
     30class tcpip :
     31    public transport_protocol,
     32    public boost::enable_shared_from_this<tcpip>
     33{
     34    typedef tcpip self;
     35use_logging_h(tcpip)
     36
     37private:
     38    class tcpip_connection :
     39        public transport_connection,
     40        public boost::enable_shared_from_this<tcpip_connection>
     41    {
     42    public:
     43        typedef reboost::message_t Packet;
     44        typedef std::queue<Packet> OutQueue;
     45       
     46        struct header_t
     47        {
     48            uint32_t length;
     49            uint16_t prot;  // XXX protlib
     50        } __attribute__((packed));
     51           
     52        tcpip_connection(boost::asio::io_service& io_service, TcpIpPtr parent);
     53       
     54        /// Inherited from transport_connection
     55        virtual void send(reboost::message_t message, uint8_t priority = 0);
     56        virtual address_vf getLocalEndpoint();
     57        virtual address_vf getRemoteEndpoint();
     58        virtual void terminate();
     59       
     60        void listen();
     61       
     62        void async_connect_handler(const error_code& error);
     63       
     64        void async_read_header_handler(const error_code& error, size_t bytes_transferred);
     65        void async_read_data_handler(const error_code& error, size_t bytes_transferred);
     66       
     67        /*
     68         * is called from asio when write operation "returns",
     69         * calls private function `send_next_package()`
     70         */
     71        void async_write_handler(
     72                reboost::shared_buffer_t packet,
     73                const error_code& error,
     74                size_t bytes_transferred);
     75
     76       
     77        void enqueue_for_sending(Packet packet, uint8_t priority);
     78       
     79    private:
     80        /*
     81         * is called from `send` or `async_write_handler` to begin/keep sending
     82         * sends the next message with the highest priority in this connection
     83         */
     84        void send_next_package();
     85
     86
     87    public:
     88        tcp::socket sock;
     89        bool valid;
     90        TcpIpPtr parent;
     91       
     92        tcp::endpoint partner;
     93        tcpip_endpoint remote;
     94        tcpip_endpoint local;
     95       
     96        vector<OutQueue> out_queues;     // to be locked with out_queues_lock
     97        boost::mutex out_queues_lock;
     98       
     99        bool sending;       // to be locked with out_queues_lock
     100       
     101        header_t header;
     102        shared_buffer_t buffy;
     103    };
     104    typedef boost::shared_ptr<tcpip_connection> ConnPtr;
     105    typedef std::map<tcp::endpoint, ConnPtr> ConnectionMap;
     106   
    26107public:
    27         tcpip( uint16_t port );
     108        tcpip( const tcp::endpoint& endp );
    28109        virtual ~tcpip();
    29110        virtual void start();
    30111        virtual void stop();
    31         virtual void send( const address_v* remote, const uint8_t* data, size_t size );
    32         virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size );
     112       
     113        /**
     114     * enqueues message for sending
     115     * create new connection if necessary
     116     * starts sending mechanism (if not already running)
     117     */
     118    void send(
     119            const tcp::endpoint&,
     120            reboost::message_t message,
     121            uint8_t priority = 0 );
     122       
     123        /**
     124         * Converts address_v to tcp::endpoint and calls the real send() function
     125         */
     126        virtual void send(
     127                const address_v* remote,
     128                reboost::message_t message,
     129                uint8_t priority = 0 );
     130       
     131        /**
     132         * calls send for each destination endpoint in `endpoint_set& endpoints`
     133         */
     134        virtual void send(
     135                const endpoint_set& endpoints,
     136                reboost::message_t message,
     137                uint8_t priority = 0 );
     138       
    33139        virtual void terminate( const address_v* remote );
     140        virtual void terminate( const tcp::endpoint& remote );
    34141        virtual void register_listener( transport_listener* listener );
    35142
     143       
     144    /**
     145     *  returns a vector of (interesting) network interfaces
     146     * 
     147     *  [NOTE: The current implementation returns the scope_ids of
     148     *  all ethX and wlanX network interfaces, to be used for
     149     *  connections to link-local ipv6 addresses.]
     150     * 
     151     *  TODO move to ariba/communication/networkinfo/AddressDiscovery ??
     152     * 
     153     */
     154    static vector<uint64_t> get_interface_scope_ids();
     155
    36156private:
    37         volatile bool done, running;
    38         uint16_t port;
    39         pthread_t tpreceivethread;
    40         ThreadStarter<TPoverTCP, TPoverTCPParam>* tpthread;
    41         static void* receiverThread( void* ptp );
     157        void accept();
     158        void async_accept_handler(ConnPtr conn, const error_code& error);
     159        tcp::endpoint convert_address(const address_v* endpoint);
     160        tcpip_endpoint convert_address(const tcp::endpoint& endpoint);
     161       
     162private:
    42163        transport_listener* listener;
     164        unique_io_service u_io_service;
     165        tcp::acceptor acceptor;
     166       
     167        ConnectionMap connections;
     168        boost::mutex connections_lock;
    43169};
    44170
  • source/ariba/utility/transport/transport.hpp

    r5284 r10653  
    88// transport protocol implementations
    99#include "tcpip/tcpip.hpp"
    10 #include "rfcomm/rfcomm.hpp"
     10#include "rfcomm/rfcomm_transport.hpp"
    1111
    1212// common transport peer using all known protocols
  • source/ariba/utility/transport/transport_listener.hpp

    r5993 r10653  
    55
    66#include "ariba/utility/addressing/addressing.hpp"
     7#include "ariba/utility/transport/messages/buffers.hpp"
     8#include "ariba/utility/transport/transport_connection.hpp"
    79
    810// namespace ariba::transport
     
    1113
    1214using namespace ariba::addressing;
    13 
    14 class transport_protocol;
    1515
    1616/**
     
    2121class transport_listener {
    2222public:
     23    /// Allow deleting implementing classes by pointer
     24    virtual ~transport_listener() {}
     25   
    2326        /// called when a message is received
    2427        virtual void receive_message(
    25                 transport_protocol* transport,
    26                 const address_vf local, const address_vf remote,
    27                 const uint8_t* data, size_t size
     28        transport_connection::sptr connection,
     29                reboost::message_t msg
    2830        ) {
    2931                std::cout << "transport_listener: not implemented" << std::endl;
  • source/ariba/utility/transport/transport_peer.cpp

    r7834 r10653  
    33#include "transport_peer.hpp"
    44#include "transport.hpp"
     5#include "ariba/utility/logging/Logging.h"
     6#include <boost/asio/ip/tcp.hpp>
     7#include <boost/asio/error.hpp>
     8#include <boost/foreach.hpp>
     9
     10#ifdef ECLIPSE_PARSER
     11    #define foreach(a, b) for(a : b)
     12#else
     13    #define foreach(a, b) BOOST_FOREACH(a, b)
     14#endif
    515
    616// namespace ariba::transport
     
    919
    1020using namespace ariba::addressing;
     21using boost::asio::ip::tcp;
     22
     23#ifdef HAVE_LIBBLUETOOTH
     24using boost::asio::bluetooth::rfcomm;
     25#endif
     26
     27use_logging_cpp(transport_peer);
    1128
    1229transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) {
    13         // setup tcp transports
    14         tcp = NULL;
    15         //cout << "#tcpip_transports = " << local.tcp.size() << endl;
    16         if (local.tcp.size()==1) {
    17                 tcp = new tcpip(local.tcp.begin()->value());
    18                 //cout << "Started tcpip_transport on port "  << local.tcp.begin()->value() << endl;
    19         }
    20 
     30   
     31    // setup tcp transports
     32    foreach(tcp_port_address port, local.tcp) {
     33       
     34        if (local.ip.size() > 0) {
     35            foreach(ip_address ip_addr, local.ip) {
     36               
     37                tcp::endpoint endp(ip_addr.asio(), port.asio());
     38                create_service(endp);
     39            }
     40        } else {
     41            tcp::endpoint endp_v6(tcp::v6(), port.asio());
     42            tcp::endpoint endp_v4(tcp::v4(), port.asio());
     43           
     44            create_service(endp_v6);
     45            create_service(endp_v4);
     46        }
     47       
     48    }
     49   
    2150        #ifdef HAVE_LIBBLUETOOTH
    22         // setup rfcomm transports
    23         rfc = NULL;
    24         //cout << "#rfcomm_transports = " << local.rfcomm.size() << endl;
    25         if ( local.rfcomm.size() == 1 ) {
    26                 rfc = new rfcomm( local.rfcomm.begin()->value() );
    27                 //cout << "Started rfcomm_transport on port "  << local.rfcomm.begin()->value() << endl;
    28         }
     51    foreach(rfcomm_channel_address channel, local.rfcomm) {
     52        if (local.bluetooth.size() > 0) {
     53                foreach(mac_address mac, local.bluetooth) {
     54                        rfcomm::endpoint endp(mac.bluetooth(), channel.value());
     55                        create_service(endp);
     56                }
     57        } else {
     58                rfcomm::endpoint endp(channel.value());
     59                create_service(endp);
     60        }
     61    }
    2962        #endif
    3063}
    3164
     65void transport_peer::create_service(tcp::endpoint endp) {
     66    try {
     67        TcpIpPtr tmp_ptr(new tcpip(endp));
     68        tcps.push_back(tmp_ptr);
     69        logging_info("Listening on IP/TCP " << endp);
     70       
     71    } catch (boost::system::system_error& e) {
     72        if (e.code() == boost::asio::error::address_in_use) {
     73            logging_warn("[WARN] Address already in use: "
     74                    << endp << ". Endpoint will be ignored!");
     75        } else {
     76            // Rethrow
     77            throw;
     78        }
     79    }
     80}
     81
     82#ifdef HAVE_LIBBLUETOOTH
     83void transport_peer::create_service(rfcomm::endpoint endp) {
     84    try {
     85        rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp));
     86        rfcomms.push_back(tmp_ptr);
     87        logging_info("Listening on bluetooth/RFCOMM " << endp);
     88       
     89    } catch (boost::system::system_error& e) {
     90        if (e.code() == boost::asio::error::address_in_use) {
     91            logging_warn("[WARN] Address already in use: "
     92                    << endp << ". Endpoint will be ignored!");
     93        } else {
     94            // Rethrow
     95            throw;
     96        }
     97    }
     98}
     99#endif
     100
    32101transport_peer::~transport_peer() {
    33         if (tcp !=NULL ) delete tcp;
    34 #ifdef HAVE_LIBBLUETOOTH
    35         if (rfc !=NULL ) delete rfc;
    36 #endif
    37102}
    38103
    39104void transport_peer::start() {
    40         if (tcp!=NULL) tcp->start();
     105    foreach(TcpIpPtr tcp, tcps) {
     106        tcp->start();
     107    }
     108   
    41109#ifdef HAVE_LIBBLUETOOTH
    42         if (rfc!=NULL) rfc->start();
     110    foreach(rfcomm_transport::sptr x, rfcomms) {
     111        x->start();
     112    }
    43113#endif
    44114}
    45115
    46116void transport_peer::stop() {
    47         if (tcp!=NULL) tcp->stop();
     117    foreach(TcpIpPtr tcp, tcps) {
     118        tcp->stop();
     119    }
     120   
    48121#ifdef HAVE_LIBBLUETOOTH
    49         if (rfc!=NULL) rfc->stop();
     122        foreach(rfcomm_transport::sptr x, rfcomms) {
     123                x->stop();
     124        }
    50125#endif
    51126}
    52127
    53 void transport_peer::send( const address_v* remote, const uint8_t* data, size_t size ) {
    54         if (remote->instanceof<tcpip_endpoint>() && tcp!=NULL) {
    55                 tcp->send(remote,data,size);
    56         } else
     128
     129void transport_peer::send(
     130        const endpoint_set& endpoints,
     131        reboost::message_t message,
     132        uint8_t priority)
     133{
     134    foreach(TcpIpPtr tcp, tcps) {
     135        tcp->send(endpoints, message, priority);
     136    }
     137   
    57138#ifdef HAVE_LIBBLUETOOTH
    58         if (remote->instanceof<rfcomm_endpoint>() && rfc!=NULL) {
    59                 rfc->send(remote,data,size);
    60         } else
    61 #endif
    62                 cerr << "Could not send message to " << remote->to_string() << endl;
    63 }
    64 
    65 void transport_peer::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
    66         if (tcp!=NULL) tcp->send(endpoints,data,size);
    67 #ifdef HAVE_LIBBLUETOOTH
    68         if (rfc!=NULL) rfc->send(endpoints,data,size);
     139    foreach(rfcomm_transport::sptr x, rfcomms) {
     140                x->send(endpoints, message, priority);
     141        }
    69142#endif
    70143}
    71144
    72145void transport_peer::terminate( const address_v* remote ) {
    73         if (remote->instanceof<tcpip_endpoint>() && tcp!=NULL)
    74                 tcp->terminate(remote);
     146        if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
     147        {
     148            foreach(TcpIpPtr tcp, tcps) {
     149                tcp->terminate(remote);
     150            }
     151        }
    75152#ifdef HAVE_LIBBLUETOOTH
    76         if (remote->instanceof<rfcomm_endpoint>() && rfc!=NULL)
    77                 rfc->terminate(remote);
     153        if (remote->instanceof<rfcomm_endpoint>()) {
     154                foreach(rfcomm_transport::sptr x, rfcomms) {
     155                        x->terminate(remote);
     156                }
     157        }
    78158#endif
    79159}
    80160
    81161void transport_peer::register_listener( transport_listener* listener ) {
    82         if (tcp!=NULL) tcp->register_listener(listener);
     162    foreach(TcpIpPtr tcp, tcps) {
     163        tcp->register_listener(listener);
     164    }
     165   
    83166#ifdef HAVE_LIBBLUETOOTH
    84         if (rfc!=NULL) rfc->register_listener(listener);
     167    foreach(rfcomm_transport::sptr x, rfcomms) {
     168        x->register_listener(listener);
     169    }
    85170#endif
    86171}
  • source/ariba/utility/transport/transport_peer.hpp

    r9324 r10653  
    55#include "transport_protocol.hpp"
    66#include "ariba/utility/addressing/endpoint_set.hpp"
     7#include <boost/shared_ptr.hpp>
     8#include "rfcomm/bluetooth_rfcomm.hpp"
     9
    710
    811// namespace ariba::transport
     
    1316
    1417class tcpip;
     18
    1519#ifdef HAVE_LIBBLUETOOTH
    16 class rfcomm;
     20class rfcomm_transport;
    1721#endif
    1822
     
    3034        virtual void start();
    3135        virtual void stop();
    32         virtual void send( const address_v* remote, const uint8_t* data, size_t size );
    33         virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size );
     36       
     37        virtual void send(
     38                const endpoint_set& endpoints,
     39                reboost::message_t message,
     40                uint8_t priority = 0);
     41       
     42        /// @deprecated: Use terminate() from transport_connection instead
    3443        virtual void terminate( const address_v* remote );
     44       
    3545        virtual void register_listener( transport_listener* listener );
    3646
    3747private:
     48        void create_service(tcp::endpoint endp);
     49#ifdef HAVE_LIBBLUETOOTH
     50        void create_service(boost::asio::bluetooth::rfcomm::endpoint endp);
     51#endif
     52       
    3853        endpoint_set&  local;
    39         tcpip* tcp;
     54        std::vector< boost::shared_ptr<tcpip> > tcps;
    4055#ifdef HAVE_LIBBLUETOOTH
    41         rfcomm* rfc;
     56        std::vector< boost::shared_ptr<rfcomm_transport> > rfcomms;
    4257#endif
    4358};
  • source/ariba/utility/transport/transport_protocol.hpp

    r5993 r10653  
    33
    44#include "ariba/utility/addressing/addressing.hpp"
    5 #include "transport_listener.hpp"
     5#include "ariba/utility/transport/transport_listener.hpp"
     6#include "ariba/utility/transport/messages/message.hpp"
    67
    78// namespace ariba::transport
     
    1819class transport_protocol {
    1920public:
     21    /// Allow deleting implementing classes by pointer
     22    virtual ~transport_protocol() {}
     23   
    2024        virtual void start() = 0;
    2125        virtual void stop() = 0;
    22         virtual void send( const address_v* remote, const uint8_t* data, size_t size ) = 0;
    23         virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) = 0;
     26       
     27        virtual void send(
     28                const endpoint_set& endpoints,
     29                reboost::message_t message,
     30                uint8_t priority = 0) = 0;
     31       
     32        /// @deprecated: Use terminate() from transport_connection instead
    2433        virtual void terminate( const address_v* remote ) = 0;
     34       
    2535        virtual void register_listener( transport_listener* listener ) = 0;
    2636};
Note: See TracChangeset for help on using the changeset viewer.