source: source/ariba/utility/transport/StreamTransport/StreamTransport.hpp@ 12773

Last change on this file since 12773 was 12773, checked in by hock@…, 11 years ago

WARNING! This revision is not intended for productive use!

!! DEBUGGING ONLY !!

Extreme debugging of StreamTransport.
"Typo" in the locking-code, was very hard to track down!

This revision is stored since a lot dabug code was written and should not just be deleted.

File size: 6.6 KB
Line 
1#ifndef STREAM_TRANSPORT_HPP_
2#define STREAM_TRANSPORT_HPP_
3
4// ariba
5#include "ariba/utility/transport/asio/unique_io_service.h"
6#include "ariba/utility/transport/messages/buffers.hpp"
7//#include "ariba/utility/logging/Logging.h"
8#include "ariba/utility/addressing2/endpoint.hpp"
9
10// ariba (transport) interfaces
11#include "ariba/utility/transport/interfaces/transport_connection.hpp"
12#include "ariba/utility/transport/interfaces/transport_protocol.hpp"
13#include "ariba/utility/transport/interfaces/transport_listener.hpp"
14
15// system
16#include <queue>
17
18// boost
19#include <boost/asio.hpp>
20#include <boost/shared_ptr.hpp>
21#include <boost/enable_shared_from_this.hpp>
22
23
24namespace ariba {
25namespace transport {
26
27using namespace std;
28using boost::shared_ptr;
29using ariba::transport::detail::unique_io_service;
30using boost::system::error_code;
31using reboost::shared_buffer_t;
32using reboost::message_t;
33
34template <class T>
35class StreamTransport :
36 public transport_protocol,
37 public boost::enable_shared_from_this<StreamTransport<T> >
38{
39 typedef StreamTransport<T> self;
40 typedef shared_ptr<self> StreamTransportPtr;
41//use_logging_h(StreamTransport<T>)
42
43
44
45
46private:
47
48/*****************
49 ** inner class **
50 *****************/
51 class StreamConnection :
52 public transport_connection,
53 public boost::enable_shared_from_this<StreamConnection>
54 {
55 public:
56 typedef reboost::message_t Packet;
57 typedef std::queue<Packet> OutQueue;
58
59 struct header_t
60 {
61 uint16_t length;
62 } __attribute__((packed));
63
64 StreamConnection(boost::asio::io_service& io_service, StreamTransportPtr parent);
65
66 virtual ~StreamConnection()
67 {
68 // XXX MARIO Debugging
69 std::cout << "/// ~StreamConnection(): " << this << ", SENDING: " << this->sending << ", VALID: " << this->valid << std::endl;
70
71 assert ( this->valid == false );
72 }
73
74 /// Inherited from transport_connection
75 // Thread: ARIBA
76 virtual bool send(reboost::message_t message, uint8_t priority = 0);
77 // Thread: ARIBA
78 virtual ariba::addressing2::EndpointPtr getLocalEndpoint();
79 // Thread: ARIBA
80 virtual ariba::addressing2::EndpointPtr getRemoteEndpoint();
81 // Thread: ARIBA
82 virtual void register_communication_link(ariba::utility::LinkID* link);
83 // Thread: ARIBA
84 virtual void unregister_communication_link(ariba::utility::LinkID* link);
85 // Thread: ARIBA
86 virtual std::vector<ariba::utility::LinkID*> get_communication_links();
87
88 // Thread: BOTH
89 virtual void terminate();
90
91 void listen();
92
93 void async_connect_handler(const error_code& error);
94
95 void async_read_header_handler(const error_code& error, size_t bytes_transferred);
96 void async_read_data_handler(const error_code& error, size_t bytes_transferred);
97
98 /*
99 * is called from asio when write operation "returns",
100 * calls private function `send_next_package()`
101 */
102 void async_write_handler(
103 reboost::shared_buffer_t packet,
104 const error_code& error,
105 size_t bytes_transferred);
106
107
108 void enqueue_for_sending(Packet packet, uint8_t priority);
109
110 private:
111 /*
112 * is called from `send` or `async_write_handler` to begin/keep sending
113 * sends the next message with the highest priority in this connection
114 */
115 void send_next_package();
116
117 public:
118 typename T::socket sock;
119 bool valid;
120 StreamTransportPtr parent;
121
122 typename T::endpoint partner_endpoint;
123 typename T::endpoint local_endpoint;
124// address_v* remote;
125// address_v* local;
126
127 vector<OutQueue> out_queues; // to be locked with out_queues_lock
128 boost::mutex out_queues_lock;
129
130 bool sending; // to be locked with out_queues_lock
131
132 header_t header;
133 shared_buffer_t buffy;
134
135
136 // XXX Mario: Debugging
137 const int MAGIC_NUMBER;
138 private:
139 std::vector<ariba::utility::LinkID*> communication_links;
140 };
141 typedef boost::shared_ptr<StreamConnection> ConnPtr;
142 typedef std::map<typename T::endpoint, ConnPtr> ConnectionMap;
143/*********************
144 ** [ inner class ] **
145 *********************/
146
147
148
149
150public:
151 StreamTransport( const typename T::endpoint& endp );
152 virtual ~StreamTransport();
153 virtual void start();
154 virtual void stop();
155
156 /**
157 * enqueues message for sending
158 * create new connection if necessary
159 * starts sending mechanism (if not already running)
160 */
161 void send(
162 const typename T::endpoint&,
163 reboost::message_t message,
164 uint8_t priority = 0 );
165
166 /**
167 * Converts address_v to tcp::endpoint and calls the real send() function
168 */
169 virtual void send(
170 const addressing2::EndpointPtr remote,
171 reboost::message_t message,
172 uint8_t priority = 0 );
173
174 /**
175 * calls send for each destination endpoint in `endpoints`
176 */
177 virtual void send(
178 const addressing2::const_EndpointSetPtr endpoints,
179 reboost::message_t message,
180 uint8_t priority = 0 );
181
182 // XXX DEPRECATED
183 virtual void terminate( addressing2::EndpointPtr remote );
184
185 virtual void terminate( const typename T::endpoint& remote );
186 virtual void register_listener( transport_listener* listener );
187
188private:
189 void accept();
190 void async_accept_handler(ConnPtr conn, const error_code& error);
191
192private:
193 transport_listener* listener;
194 unique_io_service u_io_service;
195 typename T::acceptor acceptor;
196
197 ConnectionMap connections;
198 boost::mutex connections_lock;
199};
200
201
202// aliases
203//typedef StreamTransport<boost::asio::ip::tcp> tcpip;
204//#ifdef HAVE_LIBBLUETOOTH
205// typedef StreamTransport<boost::asio::bluetooth::rfcomm> rfcomm_transport;
206//#endif /* HAVE_LIBBLUETOOTH */
207
208
209// XXX testing TODO natÃŒrlich brauchen wir das noch... oO
210///**
211// * Conversion between ASIO Adresses and ARIBA adresses
212// */
213//template <class T>
214//typename T::endpoint convert_address(const address_v* endpoint);
215//
216//template <class T>
217//address_v::shared_ptr convert_address(const typename T::endpoint& endpoint);
218
219
220
221/**
222 * returns a vector of (interesting) network interfaces
223 *
224 * [NOTE: The current implementation returns the scope_ids of
225 * all ethX and wlanX network interfaces, to be used for
226 * connections to link-local ipv6 addresses.]
227 *
228 * TODO move to ariba/communication/networkinfo/AddressDiscovery ??
229 *
230 */
231vector<uint64_t> get_interface_scope_ids();
232
233
234}} // namespace ariba::transport
235
236#endif /* STREAM_TRANSPORT_HPP_ */
Note: See TracBrowser for help on using the repository browser.