Ignore:
Timestamp:
Jul 25, 2012, 11:41:36 AM (12 years ago)
Author:
Michael Tänzer
Message:

Merge the ASIO branch back into trunk

Location:
source/ariba/utility/transport
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/transport

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/transport/tcpip/tcpip.cpp

    r10075 r10653  
    11#include "tcpip.hpp"
    22
    3 #define _NO_LOGGING
    4 
    5 // std includes
    6 #include <unistd.h>
    7 #include <iostream>
    8 #include <string>
    9 #include <sstream>
    10 #include <boost/foreach.hpp>
    11 
    12 // protlib includes
    13 #include "protlib/network_message.h"
    14 #include "protlib/tp_over_tcp.h"
    15 #include "protlib/tperror.h"
    16 #include "protlib/logfile.h"
    17 #include "protlib/queuemanager.h"
    18 #include "protlib/threadsafe_db.h"
    19 #include "protlib/setuid.h"
    20 
    21 // protlib namespaces
    22 using namespace protlib;
    23 using namespace protlib::log;
    24 
    25 logfile commonlog;
    26 protlib::log::logfile& protlib::log::DefaultLog(commonlog);
     3#include <boost/array.hpp>
     4
     5// interface discovery for link-local destinations
     6#include <ifaddrs.h>
    277
    288namespace ariba {
    299namespace transport {
    3010
     11use_logging_cpp(tcpip)
     12
    3113using namespace ariba::addressing;
    3214
    33 
    34 tcpip_endpoint convert( const appladdress* addr ) {
    35         const char* ip_str = addr->get_ip_str();
    36         tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() );
    37         return endpoint;
    38 }
    39 
    40 appladdress convert( const tcpip_endpoint& endpoint ) {
    41         tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint);
    42         appladdress
    43                 peer(e->address().to_string().c_str(), "tcp", e->port().asio() );
    44 //      cout << endpoint.to_string() << " to " << peer.get_ip_str() << ":" << peer.get_port() << endl;
    45         return peer;
    46 }
    47 
    48 tcpip::tcpip( uint16_t port ) :
    49         done ( false ),
    50         running ( false ),
    51         port( port ),
    52         tpreceivethread ( NULL ),
    53         tpthread ( NULL ),
    54         listener ( NULL ) {
    55 }
    56 
    57 tcpip::~tcpip() {
    58         if (running) stop();
    59 }
    60 
    61 bool get_message_length( NetMsg& m, uint32& clen_bytes ) {
    62         clen_bytes = m.decode32();
    63         m.set_pos_r(-4);
    64         return true;
    65 }
    66 
    67 void tcpip::start() {
    68         done = false;
    69         running = false;
    70 
    71         // initalize netdb and setuid
    72         protlib::tsdb::init();
    73         protlib::setuid::init();
    74 
    75         // set tcp parameters
    76         port_t port = this->port; // port
    77         TPoverTCPParam tppar(4, get_message_length, port);
    78 
    79         // create receiver thread
    80         FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
    81         QueueManager::instance()->register_queue(tpchecker_fq,
    82                         message::qaddr_signaling);
    83 
    84         // start thread
    85         pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this );
    86         tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
    87         tpthread->start_processing();
    88 }
    89 
    90 void tcpip::stop() {
    91         // stop receiver thread
    92         done = true;
    93 
    94         // stop TPoverTCP
    95         tpthread->stop_processing();
    96         tpthread->abort_processing(true);
    97         tpthread->wait_until_stopped();
    98 
    99         // unregister TPoverTCP
    100         delete QueueManager::instance()->get_queue( message::qaddr_signaling );
    101         QueueManager::instance()->unregister_queue( message::qaddr_signaling );
    102 
    103         // destroy QueueManager
    104         QueueManager::clear();
    105 
    106         // de-initalize netdb and setuid
    107         protlib::setuid::end();
    108         protlib::tsdb::end();
    109 
    110         // wait for thread to finish and delete
    111         pthread_join(tpreceivethread, NULL);
    112 }
    113 
    114 void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) {
    115 
    116         // prepare netmsg with length and and port
    117         NetMsg* datamsg = new NetMsg(size + 6);
    118         datamsg->encode32( size + 2,  true );
    119         datamsg->encode16( this->port,true );
    120 
    121         for (size_t i=0; i<size; i++)
    122                 datamsg->encode8( data[i],true );
    123 
    124         // send message
    125         tcpip_endpoint endpoint = *remote;
    126         appladdress peer = convert(endpoint);
    127 
    128         // add to output queue
    129         tpthread->get_thread_object()->send( datamsg, peer, false );
    130 }
    131 
    132 void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
    133         // send a message to each combination of ip-address and port
    134         BOOST_FOREACH( const ip_address ip, endpoints.ip ) {
    135                 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
    136                         tcpip_endpoint endpoint(ip,port);
    137                         address_vf vf = endpoint;
    138                         send(vf,data,size);
    139                 }
    140         }
    141 }
    142 
    143 void tcpip::terminate( const address_v* remote) {
    144         tcpip_endpoint endpoint = *remote;
    145         appladdress peer = convert(endpoint);
    146         peer.convert_to_ipv6();
    147         tpthread->get_thread_object()->terminate( peer );
    148 }
    149 
    150 void tcpip::register_listener( transport_listener* listener ) {
    151         this->listener = listener;
    152 }
    153 
    154 void* tcpip::receiverThread( void* ptp ) {
    155         // get reference to transport object
    156         tcpip& tp = *((tcpip*)ptp);
    157 
    158         // get queue
    159         FastQueue* fq =
    160                 QueueManager::instance()->get_queue(message::qaddr_signaling);
    161 
    162         // main processing loop
    163         tp.running = true;
    164         while (!tp.done) {
    165 
    166                 // wait for new message to approach
    167                 message* msg = fq->dequeue_timedwait(300);
    168 
    169                 // message has arrived? no-> continue
    170                 if (!msg) continue;
    171 
    172                 // handle transport message
    173                 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
    174                 if (!tpmsg) {
    175                         delete msg;
    176                         continue;
    177                 }
    178 
    179                 // get address & message
    180                 const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
    181                 const appladdress* local_peer  = static_cast<const appladdress*>( tpmsg->get_ownaddress() );
    182                 NetMsg* datamsg = tpmsg->get_message();
    183 
    184                 // not a data message? -> continue!
    185                 if (!datamsg) {
    186                         delete tpmsg;
    187                         continue;
    188                 }
    189 
    190                 // get length and remote endpoint port
    191                 datamsg->set_pos(0);
    192                 uint32_t message_size = datamsg->decode32(true)-2;
    193                 //uint16_t remote_port = datamsg->decode16(true);
    194 
    195                 // inform listener
    196                 if (tp.listener != NULL) {
    197                         tcpip_endpoint remote = convert(remote_peer);
    198                         tcpip_endpoint local  = convert(local_peer);
    199                         tp.listener->receive_message(
    200                                         &tp, local, remote, datamsg->get_buffer()+6, message_size );
    201                 }
    202 
    203                 tpmsg->set_message(NULL);
    204                 delete datamsg;
    205                 delete tpmsg;
    206         }
    207         // clean queue & stop
    208         fq->cleanup();
    209         tp.running = false;
    210         return NULL;
     15typedef boost::mutex::scoped_lock unique_lock;
     16
     17tcpip::tcpip( const tcp::endpoint& endp )  :
     18        listener(NULL),
     19        acceptor(u_io_service.get_asio_io_service(), endp)
     20{
     21}
     22
     23tcpip::~tcpip(){}
     24
     25void tcpip::start()
     26{
     27    // open server socket
     28    accept();
     29   
     30    u_io_service.start();
     31}
     32
     33
     34void tcpip::stop()
     35{
     36    acceptor.close();
     37   
     38    u_io_service.stop();
     39}
     40
     41
     42/* see header file for comments */
     43void tcpip::send(
     44        const tcp::endpoint& dest_addr,
     45        reboost::message_t message,
     46        uint8_t priority)
     47{
     48    ConnPtr conn;
     49    bool need_to_connect = false;
     50   
     51    {
     52        unique_lock lock(connections_lock);
     53       
     54        ConnectionMap::iterator it = connections.find(dest_addr);
     55        if (it == connections.end())
     56        {
     57            ConnPtr tmp_ptr(
     58                    new tcpip_connection(
     59                            u_io_service.get_asio_io_service(),
     60                            shared_from_this() )
     61                    );
     62            conn = tmp_ptr;
     63           
     64            conn->partner = dest_addr;
     65            conn->remote = convert_address(dest_addr);
     66           
     67            // Note: starting the send is the obligation of the connect_handler
     68            // (avoids trying to send while not connected yet)
     69            conn->sending =  true;
     70            need_to_connect = true;
     71           
     72            ConnectionMap::value_type item(dest_addr, conn);
     73            connections.insert(item);
     74           
     75        } else {
     76            conn = it->second;
     77        }
     78    }
     79   
     80   
     81    // * the actual send *
     82    conn->enqueue_for_sending(message, priority);
     83   
     84    // if new connection connect to the other party
     85    if ( need_to_connect )
     86    {
     87        conn->sock.async_connect(
     88                dest_addr,
     89                boost::bind(
     90                        &tcpip_connection::async_connect_handler,
     91                        conn,
     92                        boost::asio::placeholders::error));
     93    }
     94}
     95
     96
     97/* see header file for comments */
     98void tcpip::send(
     99        const address_v* remote,
     100        reboost::message_t message,
     101        uint8_t priority)
     102{
     103    send(convert_address(remote), message, priority);
     104}
     105
     106
     107/* see header file for comments */
     108void tcpip::send(
     109        const endpoint_set& endpoints,
     110        reboost::message_t message,
     111        uint8_t priority )
     112{
     113    // network interfaces scope_ids, for link-local connections (lazy initialization)
     114    vector<uint64_t> scope_ids;
     115   
     116    // send a message to each combination of address-address and port
     117    BOOST_FOREACH( const ip_address address, endpoints.ip ) {
     118        BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
     119            tcp::endpoint endp(address.asio(), port.asio());
     120           
     121            // special treatment for link local addresses
     122            //   ---> send over all (suitable) interfaces
     123            if ( endp.address().is_v6() )
     124            {
     125                boost::asio::ip::address_v6 v6_addr = endp.address().to_v6();
     126               
     127                if ( v6_addr.is_link_local() )
     128                {
     129                    // initialize scope_ids
     130                    if ( scope_ids.size() == 0 )
     131                        scope_ids = get_interface_scope_ids();
     132                   
     133                    BOOST_FOREACH ( uint64_t id, scope_ids )
     134                    {                       
     135                        v6_addr.scope_id(id);
     136                        endp.address(v6_addr);
     137   
     138                        logging_debug("------> SEND TO (link-local): " << endp);
     139                        // * send *
     140                        send(endp, message, priority);
     141                    }
     142                }
     143               
     144                continue;
     145            }
     146           
     147            // * send *
     148            send(endp, message, priority);
     149        }
     150    }
     151}
     152
     153
     154void tcpip::register_listener( transport_listener* listener )
     155{
     156    this->listener = listener;
     157}
     158
     159
     160void tcpip::terminate( const address_v* remote )
     161{
     162    terminate(convert_address(remote));
     163}
     164
     165void tcpip::terminate( const tcp::endpoint& remote )
     166{
     167    ConnPtr conn;
     168   
     169    // find and forget connection
     170    {
     171        unique_lock lock(connections_lock);
     172       
     173        ConnectionMap::iterator it = connections.find(remote);
     174        if (it == connections.end())
     175        {
     176            return;
     177        }
     178       
     179        conn = it->second;
     180       
     181        connections.erase(it);
     182    }
     183
     184    // close connection
     185    boost::system::error_code ec;
     186    conn->sock.shutdown(tcp::socket::shutdown_both, ec);
     187    conn->sock.close(ec);
     188}
     189
     190
     191/* private */
     192void tcpip::accept()
     193{
     194    // create new connection object
     195    ConnPtr conn(
     196            new tcpip_connection(
     197                    u_io_service.get_asio_io_service(),
     198                    shared_from_this()
     199            )
     200    );
     201   
     202    // wait for incoming connection
     203    acceptor.async_accept(
     204            conn->sock,
     205            boost::bind(&self::async_accept_handler,
     206                    this->shared_from_this(),
     207                    conn,
     208                    boost::asio::placeholders::error)
     209    );
     210}
     211
     212void tcpip::async_accept_handler(ConnPtr conn, const error_code& error)
     213{
     214    if ( ! error )
     215    {
     216        conn->partner = conn->sock.remote_endpoint();
     217        conn->remote = convert_address(conn->partner);
     218        conn->local = convert_address(conn->sock.local_endpoint());
     219       
     220        {
     221            unique_lock lock(connections_lock);
     222           
     223            ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
     224            connections.insert(item);
     225        }
     226       
     227        // read
     228        conn->listen();
     229    }
     230   
     231    // accept further connections
     232    accept();
     233}
     234
     235inline tcp::endpoint tcpip::convert_address( const address_v* address )
     236{
     237    tcpip_endpoint endpoint = *address;
     238   
     239    return tcp::endpoint(
     240        endpoint.address().asio(), endpoint.port().value()
     241    );
     242}
     243
     244
     245inline tcpip_endpoint tcpip::convert_address(const tcp::endpoint& endpoint)
     246{
     247    ip_address address;
     248    address.asio(endpoint.address());
     249    tcp_port_address port;
     250    port.value(endpoint.port());
     251    return tcpip_endpoint(address, port);
     252}
     253
     254
     255vector<uint64_t> tcpip::get_interface_scope_ids()
     256{
     257    vector<uint64_t> ret;
     258   
     259    struct ifaddrs* ifaceBuffer = NULL;
     260    void*           tmpAddrPtr  = NULL;
     261   
     262    int ok = getifaddrs( &ifaceBuffer );
     263    if( ok != 0 ) return ret;
     264
     265    for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
     266
     267        // ignore devices that are disabled or have no ip
     268        if(i == NULL) continue;
     269        struct sockaddr* addr = i->ifa_addr;
     270        if (addr==NULL) continue;
     271
     272        // only use ethX and wlanX devices
     273        string device = string(i->ifa_name);
     274        if ( (device.find("eth") == string::npos) &&
     275              (device.find("wlan")  == string::npos) /* &&
     276              (device.find("lo")  == string::npos) XXX */ )
     277        {
     278            continue;
     279        }
     280
     281        // only use interfaces with ipv6 link-local addresses
     282        if (addr->sa_family == AF_INET6)
     283        {
     284            // convert address
     285            // TODO should be possible without detour over strings
     286            char straddr[INET6_ADDRSTRLEN];
     287            tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
     288            inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
     289
     290            address_v6 v6addr = address_v6::from_string(straddr);
     291            if ( v6addr.is_link_local() )
     292            {
     293                // * append the scope_id to the return vector *
     294                ret.push_back(if_nametoindex(i->ifa_name));
     295            }
     296
     297        }
     298    }
     299
     300    freeifaddrs(ifaceBuffer);
     301   
     302    return ret;
     303}
     304
     305
     306/*****************
     307 ** inner class **
     308 *****************/
     309
     310tcpip::tcpip_connection::tcpip_connection(boost::asio::io_service & io_service, TcpIpPtr parent)  :
     311        sock(io_service),
     312        valid(true),
     313        parent(parent),
     314        out_queues(8), //TODO How much priorities shall we have?
     315        sending(false)
     316{
     317        header.length = 0;
     318        header.prot = 0;
     319}
     320
     321/*-------------------------------------------
     322 | implement transport_connection interface |
     323 -------------------------------------------*/
     324void tcpip::tcpip_connection::send(
     325        reboost::message_t message,
     326        uint8_t priority)
     327{
     328    enqueue_for_sending(message, priority);
     329}
     330
     331
     332address_vf tcpip::tcpip_connection::getLocalEndpoint()
     333{
     334    return local;
     335}
     336
     337
     338address_vf tcpip::tcpip_connection::getRemoteEndpoint()
     339{
     340    return remote;
     341}
     342
     343
     344void tcpip::tcpip_connection::terminate()
     345{
     346    parent->terminate(partner);
     347}
     348
     349
     350/*------------------------------
     351 | things we defined ourselves |
     352 ------------------------------*/
     353void tcpip::tcpip_connection::async_connect_handler(const error_code& error)
     354{
     355    if (error)
     356    {
     357        parent->terminate(partner);
     358
     359        return;
     360    }
     361   
     362    // save address in ariba format
     363    local = parent->convert_address(sock.local_endpoint());
     364   
     365    // Note: sending has to be true at this point
     366    send_next_package();
     367   
     368    listen();
     369}
     370
     371
     372void tcpip::tcpip_connection::listen()
     373{
     374    boost::asio::async_read(
     375            this->sock,
     376            boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
     377            boost::bind(
     378                    &tcpip::tcpip_connection::async_read_header_handler,
     379                    this->shared_from_this(),
     380                    boost::asio::placeholders::error,
     381                    boost::asio::placeholders::bytes_transferred
     382            )
     383    );
     384}
     385
     386
     387void tcpip::tcpip_connection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
     388{
     389    if (error)
     390    {
     391        parent->terminate(partner);
     392
     393        return;
     394    }
     395
     396    // convert byte order
     397    header.length = ntohl(header.length);
     398    header.length -= 2;  // XXX protlib
     399   
     400    assert(header.length > 0);
     401   
     402    // new buffer for the new packet
     403    buffy = shared_buffer_t(header.length);
     404
     405    // * read data *
     406    boost::asio::async_read(
     407            this->sock,
     408            boost::asio::buffer(buffy.mutable_data(), buffy.size()),
     409            boost::bind(
     410                    &tcpip::tcpip_connection::async_read_data_handler,
     411                    this->shared_from_this(),
     412                    boost::asio::placeholders::error,
     413                    boost::asio::placeholders::bytes_transferred
     414            )
     415    );
     416}
     417
     418void tcpip::tcpip_connection::async_read_data_handler(
     419        const error_code& error, size_t bytes_transferred)
     420{
     421    if (error)
     422    {
     423        parent->terminate(partner);
     424
     425        return;
     426    }
     427   
     428    message_t msg;
     429    msg.push_back(buffy);
     430    buffy = shared_buffer_t();
     431
     432    if ( parent->listener )
     433        parent->listener->receive_message(shared_from_this(), msg);
     434   
     435    listen();
     436}
     437
     438/* see header file for comments */
     439void tcpip::tcpip_connection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
     440{
     441    if ( error )
     442    {       
     443        // remove this connection
     444        parent->terminate(partner);
     445
     446        return;
     447    }
     448   
     449    send_next_package();
     450}
     451
     452
     453
     454void tcpip::tcpip_connection::enqueue_for_sending(Packet packet, uint8_t priority)
     455{
     456    bool restart_sending = false;
     457   
     458    // enqueue packet  [locked]
     459    {
     460        unique_lock(out_queues_lock);
     461       
     462        assert( priority < out_queues.size() );
     463        out_queues[priority].push(packet);
     464       
     465        if ( ! sending )
     466        {
     467            restart_sending = true;
     468            sending = true;
     469        }
     470    }
     471   
     472    // if sending was stopped, we have to restart it here
     473    if ( restart_sending )
     474    {
     475        send_next_package();
     476    }
     477}
     478
     479/* see header file for comments */
     480void tcpip::tcpip_connection::send_next_package()
     481{
     482    Packet packet;
     483    bool found = false;
     484
     485    // find packet with highest priority  [locked]
     486    {
     487        unique_lock(out_queues_lock);
     488       
     489        for ( vector<OutQueue>::iterator it = out_queues.begin();
     490                it != out_queues.end(); it++ )
     491        {
     492            if ( !it->empty() )
     493            {
     494                packet = it->front();
     495                it->pop();
     496                found = true;
     497               
     498                break;
     499            }
     500        }
     501       
     502        // no packets waiting --> stop sending
     503        if ( ! found )
     504        {
     505            sending = false;
     506        }
     507    }
     508   
     509    // * send *
     510    if ( found )
     511    {
     512        reboost::shared_buffer_t header_buf(sizeof(header_t));
     513        header_t* header = (header_t*)(header_buf.mutable_data());
     514        header->length = htonl(packet.size()+2);  // XXX protlib
     515       
     516        packet.push_front(header_buf);
     517       
     518        // "convert" message to asio buffer sequence
     519        vector<boost::asio::const_buffer> send_sequence(packet.length());
     520        for ( int i=0; i < packet.length(); i++ )
     521        {
     522            shared_buffer_t b = packet.at(i);
     523            send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
     524        }
     525       
     526        // * async write *
     527        boost::asio::async_write(
     528                this->sock,
     529                send_sequence,
     530                boost::bind(
     531                        &tcpip::tcpip_connection::async_write_handler,
     532                        this->shared_from_this(),
     533                        packet,  // makes sure our shared pointer lives long enough ;-)
     534                        boost::asio::placeholders::error,
     535                        boost::asio::placeholders::bytes_transferred)
     536        );
     537    }
    211538}
    212539
Note: See TracChangeset for help on using the changeset viewer.