close Warning: Can't use blame annotator:
No changeset 10924 in the repository

source: source/ariba/utility/transport/StreamTransport/StreamTransport.hpp@ 12747

Last change on this file since 12747 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 6.4 KB
RevLine 
1#ifndef STREAM_TRANSPORT_HPP_
2#define STREAM_TRANSPORT_HPP_
3
4// ariba
5#include "ariba/utility/transport/asio/unique_io_service.h"
6#include "ariba/utility/transport/messages/buffers.hpp"
7//#include "ariba/utility/logging/Logging.h"
8#include "ariba/utility/addressing2/endpoint.hpp"
9
10// ariba (transport) interfaces
11#include "ariba/utility/transport/interfaces/transport_connection.hpp"
12#include "ariba/utility/transport/interfaces/transport_protocol.hpp"
13#include "ariba/utility/transport/interfaces/transport_listener.hpp"
14
15// system
16#include <queue>
17
18// boost
19#include <boost/asio.hpp>
20#include <boost/shared_ptr.hpp>
21#include <boost/enable_shared_from_this.hpp>
22
23
24namespace ariba {
25namespace transport {
26
27using namespace std;
28using boost::shared_ptr;
29using ariba::transport::detail::unique_io_service;
30using boost::system::error_code;
31using reboost::shared_buffer_t;
32using reboost::message_t;
33
34template <class T>
35class StreamTransport :
36 public transport_protocol,
37 public boost::enable_shared_from_this<StreamTransport<T> >
38{
39 typedef StreamTransport<T> self;
40 typedef shared_ptr<self> StreamTransportPtr;
41//use_logging_h(StreamTransport<T>)
42
43
44
45
46private:
47
48/*****************
49 ** inner class **
50 *****************/
51 class StreamConnection :
52 public transport_connection,
53 public boost::enable_shared_from_this<StreamConnection>
54 {
55 public:
56 typedef reboost::message_t Packet;
57 typedef std::queue<Packet> OutQueue;
58
59 struct header_t
60 {
61 uint16_t length;
62 } __attribute__((packed));
63
64 StreamConnection(boost::asio::io_service& io_service, StreamTransportPtr parent);
65
66 virtual ~StreamConnection() {}
67
68 /// Inherited from transport_connection
69 // Thread: ARIBA
70 virtual bool send(reboost::message_t message, uint8_t priority = 0);
71 // Thread: ARIBA
72 virtual ariba::addressing2::EndpointPtr getLocalEndpoint();
73 // Thread: ARIBA
74 virtual ariba::addressing2::EndpointPtr getRemoteEndpoint();
75 // Thread: ARIBA
76 virtual void register_communication_link(ariba::utility::LinkID* link);
77 // Thread: ARIBA
78 virtual void unregister_communication_link(ariba::utility::LinkID* link);
79 // Thread: ARIBA
80 virtual std::vector<ariba::utility::LinkID*> get_communication_links();
81
82 // Thread: BOTH
83 virtual void terminate();
84
85 void listen();
86
87 void async_connect_handler(const error_code& error);
88
89 void async_read_header_handler(const error_code& error, size_t bytes_transferred);
90 void async_read_data_handler(const error_code& error, size_t bytes_transferred);
91
92 /*
93 * is called from asio when write operation "returns",
94 * calls private function `send_next_package()`
95 */
96 void async_write_handler(
97 reboost::shared_buffer_t packet,
98 const error_code& error,
99 size_t bytes_transferred);
100
101
102 void enqueue_for_sending(Packet packet, uint8_t priority);
103
104 private:
105 /*
106 * is called from `send` or `async_write_handler` to begin/keep sending
107 * sends the next message with the highest priority in this connection
108 */
109 void send_next_package();
110
111 public:
112 typename T::socket sock;
113 bool valid;
114 StreamTransportPtr parent;
115
116 typename T::endpoint partner_endpoint;
117 typename T::endpoint local_endpoint;
118// address_v* remote;
119// address_v* local;
120
121 vector<OutQueue> out_queues; // to be locked with out_queues_lock
122 boost::mutex out_queues_lock;
123
124 bool sending; // to be locked with out_queues_lock
125
126 header_t header;
127 shared_buffer_t buffy;
128
129 private:
130 std::vector<ariba::utility::LinkID*> communication_links;
131 };
132 typedef boost::shared_ptr<StreamConnection> ConnPtr;
133 typedef std::map<typename T::endpoint, ConnPtr> ConnectionMap;
134/*********************
135 ** [ inner class ] **
136 *********************/
137
138
139
140
141public:
142 StreamTransport( const typename T::endpoint& endp );
143 virtual ~StreamTransport();
144 virtual void start();
145 virtual void stop();
146
147 /**
148 * enqueues message for sending
149 * create new connection if necessary
150 * starts sending mechanism (if not already running)
151 */
152 void send(
153 const typename T::endpoint&,
154 reboost::message_t message,
155 uint8_t priority = 0 );
156
157 /**
158 * Converts address_v to tcp::endpoint and calls the real send() function
159 */
160 virtual void send(
161 const addressing2::EndpointPtr remote,
162 reboost::message_t message,
163 uint8_t priority = 0 );
164
165 /**
166 * calls send for each destination endpoint in `endpoints`
167 */
168 virtual void send(
169 const addressing2::const_EndpointSetPtr endpoints,
170 reboost::message_t message,
171 uint8_t priority = 0 );
172
173 // XXX DEPRECATED
174 virtual void terminate( addressing2::EndpointPtr remote );
175
176 virtual void terminate( const typename T::endpoint& remote );
177 virtual void register_listener( transport_listener* listener );
178
179private:
180 void accept();
181 void async_accept_handler(ConnPtr conn, const error_code& error);
182
183private:
184 transport_listener* listener;
185 unique_io_service u_io_service;
186 typename T::acceptor acceptor;
187
188 ConnectionMap connections;
189 boost::mutex connections_lock;
190};
191
192
193// aliases
194//typedef StreamTransport<boost::asio::ip::tcp> tcpip;
195//#ifdef HAVE_LIBBLUETOOTH
196// typedef StreamTransport<boost::asio::bluetooth::rfcomm> rfcomm_transport;
197//#endif /* HAVE_LIBBLUETOOTH */
198
199
200// XXX testing TODO natÃŒrlich brauchen wir das noch... oO
201///**
202// * Conversion between ASIO Adresses and ARIBA adresses
203// */
204//template <class T>
205//typename T::endpoint convert_address(const address_v* endpoint);
206//
207//template <class T>
208//address_v::shared_ptr convert_address(const typename T::endpoint& endpoint);
209
210
211
212/**
213 * returns a vector of (interesting) network interfaces
214 *
215 * [NOTE: The current implementation returns the scope_ids of
216 * all ethX and wlanX network interfaces, to be used for
217 * connections to link-local ipv6 addresses.]
218 *
219 * TODO move to ariba/communication/networkinfo/AddressDiscovery ??
220 *
221 */
222vector<uint64_t> get_interface_scope_ids();
223
224
225}} // namespace ariba::transport
226
227#endif /* STREAM_TRANSPORT_HPP_ */
Note: See TracBrowser for help on using the repository browser.