close Warning: Can't use blame annotator:
No changeset 10924 in the repository

source: source/ariba/utility/transport/StreamTransport/StreamTransport.hpp@ 12775

Last change on this file since 12775 was 12774, checked in by hock@…, 11 years ago

StreamTransport bug fixed!!

[ Back to normal. :-) ]

File size: 6.4 KB
RevLine 
1#ifndef STREAM_TRANSPORT_HPP_
2#define STREAM_TRANSPORT_HPP_
3
4// ariba
5#include "ariba/utility/transport/asio/unique_io_service.h"
6#include "ariba/utility/transport/messages/buffers.hpp"
7//#include "ariba/utility/logging/Logging.h"
8#include "ariba/utility/addressing2/endpoint.hpp"
9
10// ariba (transport) interfaces
11#include "ariba/utility/transport/interfaces/transport_connection.hpp"
12#include "ariba/utility/transport/interfaces/transport_protocol.hpp"
13#include "ariba/utility/transport/interfaces/transport_listener.hpp"
14
15// system
16#include <queue>
17
18// boost
19#include <boost/asio.hpp>
20#include <boost/shared_ptr.hpp>
21#include <boost/enable_shared_from_this.hpp>
22
23
24namespace ariba {
25namespace transport {
26
27using namespace std;
28using boost::shared_ptr;
29using ariba::transport::detail::unique_io_service;
30using boost::system::error_code;
31using reboost::shared_buffer_t;
32using reboost::message_t;
33
34template <class T>
35class StreamTransport :
36 public transport_protocol,
37 public boost::enable_shared_from_this<StreamTransport<T> >
38{
39 typedef StreamTransport<T> self;
40 typedef shared_ptr<self> StreamTransportPtr;
41//use_logging_h(StreamTransport<T>)
42
43
44
45
46private:
47
48/*****************
49 ** inner class **
50 *****************/
51 class StreamConnection :
52 public transport_connection,
53 public boost::enable_shared_from_this<StreamConnection>
54 {
55 public:
56 typedef reboost::message_t Packet;
57 typedef std::queue<Packet> OutQueue;
58
59 struct header_t
60 {
61 uint16_t length;
62 } __attribute__((packed));
63
64 StreamConnection(boost::asio::io_service& io_service, StreamTransportPtr parent);
65
66 virtual ~StreamConnection()
67 {
68 assert ( this->valid == false );
69 }
70
71 /// Inherited from transport_connection
72 // Thread: ARIBA
73 virtual bool send(reboost::message_t message, uint8_t priority = 0);
74 // Thread: ARIBA
75 virtual ariba::addressing2::EndpointPtr getLocalEndpoint();
76 // Thread: ARIBA
77 virtual ariba::addressing2::EndpointPtr getRemoteEndpoint();
78 // Thread: ARIBA
79 virtual void register_communication_link(ariba::utility::LinkID* link);
80 // Thread: ARIBA
81 virtual void unregister_communication_link(ariba::utility::LinkID* link);
82 // Thread: ARIBA
83 virtual std::vector<ariba::utility::LinkID*> get_communication_links();
84
85 // Thread: BOTH
86 virtual void terminate();
87
88 void listen();
89
90 void async_connect_handler(const error_code& error);
91
92 void async_read_header_handler(const error_code& error, size_t bytes_transferred);
93 void async_read_data_handler(const error_code& error, size_t bytes_transferred);
94
95 /*
96 * is called from asio when write operation "returns",
97 * calls private function `send_next_package()`
98 */
99 void async_write_handler(
100 reboost::shared_buffer_t packet,
101 const error_code& error,
102 size_t bytes_transferred);
103
104
105 void enqueue_for_sending(Packet packet, uint8_t priority);
106
107 private:
108 /*
109 * is called from `send` or `async_write_handler` to begin/keep sending
110 * sends the next message with the highest priority in this connection
111 */
112 void send_next_package();
113
114 public:
115 typename T::socket sock;
116 bool valid;
117 StreamTransportPtr parent;
118
119 typename T::endpoint partner_endpoint;
120 typename T::endpoint local_endpoint;
121// address_v* remote;
122// address_v* local;
123
124 vector<OutQueue> out_queues; // to be locked with out_queues_lock
125 boost::mutex out_queues_lock;
126
127 bool sending; // to be locked with out_queues_lock
128
129 header_t header;
130 shared_buffer_t buffy;
131
132 private:
133 std::vector<ariba::utility::LinkID*> communication_links;
134 };
135 typedef boost::shared_ptr<StreamConnection> ConnPtr;
136 typedef std::map<typename T::endpoint, ConnPtr> ConnectionMap;
137/*********************
138 ** [ inner class ] **
139 *********************/
140
141
142
143
144public:
145 StreamTransport( const typename T::endpoint& endp );
146 virtual ~StreamTransport();
147 virtual void start();
148 virtual void stop();
149
150 /**
151 * enqueues message for sending
152 * create new connection if necessary
153 * starts sending mechanism (if not already running)
154 */
155 void send(
156 const typename T::endpoint&,
157 reboost::message_t message,
158 uint8_t priority = 0 );
159
160 /**
161 * Converts address_v to tcp::endpoint and calls the real send() function
162 */
163 virtual void send(
164 const addressing2::EndpointPtr remote,
165 reboost::message_t message,
166 uint8_t priority = 0 );
167
168 /**
169 * calls send for each destination endpoint in `endpoints`
170 */
171 virtual void send(
172 const addressing2::const_EndpointSetPtr endpoints,
173 reboost::message_t message,
174 uint8_t priority = 0 );
175
176 // XXX DEPRECATED
177 virtual void terminate( addressing2::EndpointPtr remote );
178
179 virtual void terminate( const typename T::endpoint& remote );
180 virtual void register_listener( transport_listener* listener );
181
182private:
183 void accept();
184 void async_accept_handler(ConnPtr conn, const error_code& error);
185
186private:
187 transport_listener* listener;
188 unique_io_service u_io_service;
189 typename T::acceptor acceptor;
190
191 ConnectionMap connections;
192 boost::mutex connections_lock;
193};
194
195
196// aliases
197//typedef StreamTransport<boost::asio::ip::tcp> tcpip;
198//#ifdef HAVE_LIBBLUETOOTH
199// typedef StreamTransport<boost::asio::bluetooth::rfcomm> rfcomm_transport;
200//#endif /* HAVE_LIBBLUETOOTH */
201
202
203// XXX testing TODO natÃŒrlich brauchen wir das noch... oO
204///**
205// * Conversion between ASIO Adresses and ARIBA adresses
206// */
207//template <class T>
208//typename T::endpoint convert_address(const address_v* endpoint);
209//
210//template <class T>
211//address_v::shared_ptr convert_address(const typename T::endpoint& endpoint);
212
213
214
215/**
216 * returns a vector of (interesting) network interfaces
217 *
218 * [NOTE: The current implementation returns the scope_ids of
219 * all ethX and wlanX network interfaces, to be used for
220 * connections to link-local ipv6 addresses.]
221 *
222 * TODO move to ariba/communication/networkinfo/AddressDiscovery ??
223 *
224 */
225vector<uint64_t> get_interface_scope_ids();
226
227
228}} // namespace ariba::transport
229
230#endif /* STREAM_TRANSPORT_HPP_ */
Note: See TracBrowser for help on using the repository browser.