#ifndef STREAM_TRANSPORT_HPP_ #define STREAM_TRANSPORT_HPP_ // ariba #include "ariba/utility/transport/asio/unique_io_service.h" #include "ariba/utility/transport/messages/buffers.hpp" //#include "ariba/utility/logging/Logging.h" #include "ariba/utility/addressing2/endpoint.hpp" // ariba (transport) interfaces #include "ariba/utility/transport/interfaces/transport_connection.hpp" #include "ariba/utility/transport/interfaces/transport_protocol.hpp" #include "ariba/utility/transport/interfaces/transport_listener.hpp" // system #include // boost #include #include #include namespace ariba { namespace transport { using namespace std; using boost::shared_ptr; using ariba::transport::detail::unique_io_service; using boost::system::error_code; using reboost::shared_buffer_t; using reboost::message_t; template class StreamTransport : public transport_protocol, public boost::enable_shared_from_this > { typedef StreamTransport self; typedef shared_ptr StreamTransportPtr; //use_logging_h(StreamTransport) private: /***************** ** inner class ** *****************/ class StreamConnection : public transport_connection, public boost::enable_shared_from_this { public: typedef reboost::message_t Packet; typedef std::queue OutQueue; struct header_t { uint16_t length; } __attribute__((packed)); StreamConnection(boost::asio::io_service& io_service, StreamTransportPtr parent); virtual ~StreamConnection() { assert ( this->valid == false ); } /// Inherited from transport_connection // Thread: ARIBA virtual bool send(reboost::message_t message, uint8_t priority = 0); // Thread: ARIBA virtual ariba::addressing2::EndpointPtr getLocalEndpoint(); // Thread: ARIBA virtual ariba::addressing2::EndpointPtr getRemoteEndpoint(); // Thread: ARIBA virtual void register_communication_link(ariba::utility::LinkID* link); // Thread: ARIBA virtual void unregister_communication_link(ariba::utility::LinkID* link); // Thread: ARIBA virtual std::vector get_communication_links(); // Thread: BOTH virtual void terminate(); void listen(); void async_connect_handler(const error_code& error); void async_read_header_handler(const error_code& error, size_t bytes_transferred); void async_read_data_handler(const error_code& error, size_t bytes_transferred); /* * is called from asio when write operation "returns", * calls private function `send_next_package()` */ void async_write_handler( reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred); void enqueue_for_sending(Packet packet, uint8_t priority); private: /* * is called from `send` or `async_write_handler` to begin/keep sending * sends the next message with the highest priority in this connection */ void send_next_package(); public: typename T::socket sock; bool valid; StreamTransportPtr parent; typename T::endpoint partner_endpoint; typename T::endpoint local_endpoint; // address_v* remote; // address_v* local; vector out_queues; // to be locked with out_queues_lock boost::mutex out_queues_lock; bool sending; // to be locked with out_queues_lock header_t header; shared_buffer_t buffy; private: std::vector communication_links; }; typedef boost::shared_ptr ConnPtr; typedef std::map ConnectionMap; /********************* ** [ inner class ] ** *********************/ public: StreamTransport( const typename T::endpoint& endp ); virtual ~StreamTransport(); virtual void start(); virtual void stop(); /** * enqueues message for sending * create new connection if necessary * starts sending mechanism (if not already running) */ void send( const typename T::endpoint&, reboost::message_t message, uint8_t priority = 0 ); /** * Converts address_v to tcp::endpoint and calls the real send() function */ virtual void send( const addressing2::EndpointPtr remote, reboost::message_t message, uint8_t priority = 0 ); /** * calls send for each destination endpoint in `endpoints` */ virtual void send( const addressing2::const_EndpointSetPtr endpoints, reboost::message_t message, uint8_t priority = 0 ); // XXX DEPRECATED virtual void terminate( addressing2::EndpointPtr remote ); virtual void terminate( const typename T::endpoint& remote ); virtual void register_listener( transport_listener* listener ); private: void accept(); void async_accept_handler(ConnPtr conn, const error_code& error); private: transport_listener* listener; unique_io_service u_io_service; typename T::acceptor acceptor; ConnectionMap connections; boost::mutex connections_lock; }; // aliases //typedef StreamTransport tcpip; //#ifdef HAVE_LIBBLUETOOTH // typedef StreamTransport rfcomm_transport; //#endif /* HAVE_LIBBLUETOOTH */ // XXX testing TODO natürlich brauchen wir das noch... oO ///** // * Conversion between ASIO Adresses and ARIBA adresses // */ //template //typename T::endpoint convert_address(const address_v* endpoint); // //template //address_v::shared_ptr convert_address(const typename T::endpoint& endpoint); /** * returns a vector of (interesting) network interfaces * * [NOTE: The current implementation returns the scope_ids of * all ethX and wlanX network interfaces, to be used for * connections to link-local ipv6 addresses.] * * TODO move to ariba/communication/networkinfo/AddressDiscovery ?? * */ vector get_interface_scope_ids(); }} // namespace ariba::transport #endif /* STREAM_TRANSPORT_HPP_ */