Ignore:
Timestamp:
Jul 25, 2012, 11:41:36 AM (12 years ago)
Author:
Michael Tänzer
Message:

Merge the ASIO branch back into trunk

Location:
source/ariba/utility/transport
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/transport

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/transport/tcpip/tcpip.hpp

    r5993 r10653  
    33
    44#include "ariba/utility/transport/transport.hpp"
    5 #include <pthread.h>
    6 
    7 // forward declaration
    8 namespace protlib {
    9 template<class X, class Y>
    10 class ThreadStarter;
    11 class TPoverTCP;
    12 class TPoverTCPParam;
    13 }
     5#include "ariba/utility/transport/asio/unique_io_service.h"
     6#include "ariba/utility/transport/transport_connection.hpp"
     7#include "ariba/utility/addressing/tcpip_endpoint.hpp"
     8#include <boost/asio.hpp>
     9#include <boost/shared_ptr.hpp>
     10#include <boost/enable_shared_from_this.hpp>
     11#include <queue>
     12#include "ariba/utility/transport/messages/buffers.hpp"
     13#include "ariba/utility/logging/Logging.h"
    1414
    1515namespace ariba {
    1616namespace transport {
    1717
    18 using namespace protlib;
     18using namespace std;
     19using ariba::transport::detail::unique_io_service;
     20using ariba::addressing::tcpip_endpoint;
     21using boost::asio::ip::tcp;
     22using boost::asio::ip::address_v6;
     23using boost::system::error_code;
     24using reboost::shared_buffer_t;
     25using reboost::message_t;
    1926
    20 /**
    21  * TODO: Doc
    22  *
    23  * @author Sebastian Mies <mies@tm.uka.de>
    24  */
    25 class tcpip : public transport_protocol {
     27class tcpip;
     28typedef boost::shared_ptr<tcpip> TcpIpPtr;
     29
     30class tcpip :
     31    public transport_protocol,
     32    public boost::enable_shared_from_this<tcpip>
     33{
     34    typedef tcpip self;
     35use_logging_h(tcpip)
     36
     37private:
     38    class tcpip_connection :
     39        public transport_connection,
     40        public boost::enable_shared_from_this<tcpip_connection>
     41    {
     42    public:
     43        typedef reboost::message_t Packet;
     44        typedef std::queue<Packet> OutQueue;
     45       
     46        struct header_t
     47        {
     48            uint32_t length;
     49            uint16_t prot;  // XXX protlib
     50        } __attribute__((packed));
     51           
     52        tcpip_connection(boost::asio::io_service& io_service, TcpIpPtr parent);
     53       
     54        /// Inherited from transport_connection
     55        virtual void send(reboost::message_t message, uint8_t priority = 0);
     56        virtual address_vf getLocalEndpoint();
     57        virtual address_vf getRemoteEndpoint();
     58        virtual void terminate();
     59       
     60        void listen();
     61       
     62        void async_connect_handler(const error_code& error);
     63       
     64        void async_read_header_handler(const error_code& error, size_t bytes_transferred);
     65        void async_read_data_handler(const error_code& error, size_t bytes_transferred);
     66       
     67        /*
     68         * is called from asio when write operation "returns",
     69         * calls private function `send_next_package()`
     70         */
     71        void async_write_handler(
     72                reboost::shared_buffer_t packet,
     73                const error_code& error,
     74                size_t bytes_transferred);
     75
     76       
     77        void enqueue_for_sending(Packet packet, uint8_t priority);
     78       
     79    private:
     80        /*
     81         * is called from `send` or `async_write_handler` to begin/keep sending
     82         * sends the next message with the highest priority in this connection
     83         */
     84        void send_next_package();
     85
     86
     87    public:
     88        tcp::socket sock;
     89        bool valid;
     90        TcpIpPtr parent;
     91       
     92        tcp::endpoint partner;
     93        tcpip_endpoint remote;
     94        tcpip_endpoint local;
     95       
     96        vector<OutQueue> out_queues;     // to be locked with out_queues_lock
     97        boost::mutex out_queues_lock;
     98       
     99        bool sending;       // to be locked with out_queues_lock
     100       
     101        header_t header;
     102        shared_buffer_t buffy;
     103    };
     104    typedef boost::shared_ptr<tcpip_connection> ConnPtr;
     105    typedef std::map<tcp::endpoint, ConnPtr> ConnectionMap;
     106   
    26107public:
    27         tcpip( uint16_t port );
     108        tcpip( const tcp::endpoint& endp );
    28109        virtual ~tcpip();
    29110        virtual void start();
    30111        virtual void stop();
    31         virtual void send( const address_v* remote, const uint8_t* data, size_t size );
    32         virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size );
     112       
     113        /**
     114     * enqueues message for sending
     115     * create new connection if necessary
     116     * starts sending mechanism (if not already running)
     117     */
     118    void send(
     119            const tcp::endpoint&,
     120            reboost::message_t message,
     121            uint8_t priority = 0 );
     122       
     123        /**
     124         * Converts address_v to tcp::endpoint and calls the real send() function
     125         */
     126        virtual void send(
     127                const address_v* remote,
     128                reboost::message_t message,
     129                uint8_t priority = 0 );
     130       
     131        /**
     132         * calls send for each destination endpoint in `endpoint_set& endpoints`
     133         */
     134        virtual void send(
     135                const endpoint_set& endpoints,
     136                reboost::message_t message,
     137                uint8_t priority = 0 );
     138       
    33139        virtual void terminate( const address_v* remote );
     140        virtual void terminate( const tcp::endpoint& remote );
    34141        virtual void register_listener( transport_listener* listener );
    35142
     143       
     144    /**
     145     *  returns a vector of (interesting) network interfaces
     146     * 
     147     *  [NOTE: The current implementation returns the scope_ids of
     148     *  all ethX and wlanX network interfaces, to be used for
     149     *  connections to link-local ipv6 addresses.]
     150     * 
     151     *  TODO move to ariba/communication/networkinfo/AddressDiscovery ??
     152     * 
     153     */
     154    static vector<uint64_t> get_interface_scope_ids();
     155
    36156private:
    37         volatile bool done, running;
    38         uint16_t port;
    39         pthread_t tpreceivethread;
    40         ThreadStarter<TPoverTCP, TPoverTCPParam>* tpthread;
    41         static void* receiverThread( void* ptp );
     157        void accept();
     158        void async_accept_handler(ConnPtr conn, const error_code& error);
     159        tcp::endpoint convert_address(const address_v* endpoint);
     160        tcpip_endpoint convert_address(const tcp::endpoint& endpoint);
     161       
     162private:
    42163        transport_listener* listener;
     164        unique_io_service u_io_service;
     165        tcp::acceptor acceptor;
     166       
     167        ConnectionMap connections;
     168        boost::mutex connections_lock;
    43169};
    44170
Note: See TracChangeset for help on using the changeset viewer.