close Warning: Can't use blame annotator:
No changeset 10624 in the repository

source: source/ariba/utility/transport/rfcomm/rfcomm_transport.cpp@ 10788

Last change on this file since 10788 was 10653, checked in by Michael Tänzer, 12 years ago

Merge the ASIO branch back into trunk

File size: 11.3 KB
RevLine 
1#include "rfcomm_transport.hpp"
2
3#ifdef HAVE_LIBBLUETOOTH
4#include <boost/array.hpp>
5
6namespace ariba {
7namespace transport {
8
9use_logging_cpp(rfcomm_transport)
10
11using namespace ariba::addressing;
12
13typedef boost::mutex::scoped_lock unique_lock;
14
15/* constructor */
16rfcomm_transport::rfcomm_transport( const rfcomm_transport::rfcomm::endpoint& endp ) :
17 listener(NULL),
18 acceptor(u_io_service.get_asio_io_service(), endp)
19{
20}
21
22rfcomm_transport::~rfcomm_transport(){}
23
24void rfcomm_transport::start()
25{
26 // open server socket
27 accept();
28
29 u_io_service.start();
30}
31
32
33void rfcomm_transport::stop()
34{
35 acceptor.close();
36
37 u_io_service.stop();
38}
39
40
41/* see header file for comments */
42void rfcomm_transport::send(
43 const rfcomm::endpoint& dest_addr,
44 reboost::message_t message,
45 uint8_t priority)
46{
47 ConnPtr conn;
48 bool need_to_connect = false;
49
50 {
51 unique_lock lock(connections_lock);
52
53 ConnectionMap::iterator it = connections.find(dest_addr);
54 if (it == connections.end())
55 {
56 ConnPtr tmp_ptr(
57 new rfcomm_connection(
58 u_io_service.get_asio_io_service(),
59 shared_from_this() )
60 );
61 conn = tmp_ptr;
62
63 conn->partner = dest_addr;
64 conn->remote = convert_address(dest_addr);
65
66 // Note: starting the send is the obligation of the connect_handler
67 // (avoids trying to send while not connected yet)
68 conn->sending = true;
69 need_to_connect = true;
70
71 ConnectionMap::value_type item(dest_addr, conn);
72 connections.insert(item);
73
74 } else {
75 conn = it->second;
76 }
77 }
78
79
80 // * the actual send *
81 conn->enqueue_for_sending(message, priority);
82
83 // if new connection connect to the other party
84 if ( need_to_connect )
85 {
86 conn->sock.async_connect(
87 dest_addr,
88 boost::bind(
89 &rfcomm_connection::async_connect_handler,
90 conn,
91 boost::asio::placeholders::error));
92 }
93}
94
95
96/* see header file for comments */
97void rfcomm_transport::send(
98 const address_v* remote,
99 reboost::message_t message,
100 uint8_t priority)
101{
102 send(convert_address(remote), message, priority);
103}
104
105
106/* see header file for comments */
107void rfcomm_transport::send(
108 const endpoint_set& endpoints,
109 reboost::message_t message,
110 uint8_t priority )
111{
112 // send a message to each combination of address-address and port
113 BOOST_FOREACH( const mac_address mac, endpoints.bluetooth ) {
114 BOOST_FOREACH( const rfcomm_channel_address channel, endpoints.rfcomm ) {
115 rfcomm::endpoint endp(mac.bluetooth(), channel.value());
116
117 // * send *
118 send(endp, message, priority);
119 }
120 }
121}
122
123
124void rfcomm_transport::register_listener( transport_listener* listener )
125{
126 this->listener = listener;
127}
128
129
130void rfcomm_transport::terminate( const address_v* remote )
131{
132 terminate(convert_address(remote));
133}
134
135void rfcomm_transport::terminate( const rfcomm::endpoint& remote )
136{
137 ConnPtr conn;
138
139 // find and forget connection
140 {
141 unique_lock lock(connections_lock);
142
143 ConnectionMap::iterator it = connections.find(remote);
144 if (it == connections.end())
145 {
146 return;
147 }
148
149 conn = it->second;
150
151 connections.erase(it);
152 }
153
154 // close connection
155 boost::system::error_code ec;
156 conn->sock.shutdown(tcp::socket::shutdown_both, ec);
157 conn->sock.close(ec);
158}
159
160
161/* private */
162void rfcomm_transport::accept()
163{
164 // create new connection object
165 ConnPtr conn(
166 new rfcomm_connection(
167 u_io_service.get_asio_io_service(),
168 shared_from_this()
169 )
170 );
171
172 // wait for incoming connection
173 acceptor.async_accept(
174 conn->sock,
175 boost::bind(&self::async_accept_handler,
176 this->shared_from_this(),
177 conn,
178 boost::asio::placeholders::error)
179 );
180}
181
182void rfcomm_transport::async_accept_handler(ConnPtr conn, const error_code& error)
183{
184 if ( ! error )
185 {
186 conn->partner = conn->sock.remote_endpoint();
187 conn->remote = convert_address(conn->partner);
188 conn->local = convert_address(conn->sock.local_endpoint());
189
190 {
191 unique_lock lock(connections_lock);
192
193 ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
194 connections.insert(item);
195 }
196
197 // read
198 conn->listen();
199 }
200
201 // accept further connections
202 accept();
203}
204
205inline rfcomm_transport::rfcomm::endpoint
206rfcomm_transport::convert_address( const address_v* address )
207{
208 rfcomm_endpoint endpoint = *address;
209
210 return rfcomm::endpoint(
211 endpoint.mac().bluetooth(), endpoint.channel().value()
212 );
213}
214
215
216inline rfcomm_endpoint rfcomm_transport::convert_address(const rfcomm::endpoint& endpoint)
217{
218 mac_address mac;
219 mac.bluetooth(endpoint.address());
220 rfcomm_channel_address channel;
221 channel.value(endpoint.channel());
222 return rfcomm_endpoint(mac, channel);
223}
224
225
226/*****************
227 ** inner class **
228 *****************/
229
230rfcomm_transport::rfcomm_connection::rfcomm_connection(
231 boost::asio::io_service & io_service,
232 rfcomm_transport::sptr parent) :
233 sock(io_service),
234 valid(true),
235 parent(parent),
236 out_queues(8), //TODO How much priorities shall we have?
237 sending(false)
238{
239 header.length = 0;
240 header.prot = 0;
241}
242
243/*-------------------------------------------
244 | implement transport_connection interface |
245 -------------------------------------------*/
246void rfcomm_transport::rfcomm_connection::send(
247 reboost::message_t message,
248 uint8_t priority)
249{
250 enqueue_for_sending(message, priority);
251}
252
253
254address_vf rfcomm_transport::rfcomm_connection::getLocalEndpoint()
255{
256 return local;
257}
258
259
260address_vf rfcomm_transport::rfcomm_connection::getRemoteEndpoint()
261{
262 return remote;
263}
264
265
266void rfcomm_transport::rfcomm_connection::terminate()
267{
268 parent->terminate(partner);
269}
270
271
272/*------------------------------
273 | things we defined ourselves |
274 ------------------------------*/
275void rfcomm_transport::rfcomm_connection::async_connect_handler(const error_code& error)
276{
277 if (error)
278 {
279 parent->terminate(partner);
280
281 return;
282 }
283
284 // save address in ariba format
285 local = parent->convert_address(sock.local_endpoint());
286
287 // Note: sending has to be true at this point
288 send_next_package();
289
290 listen();
291}
292
293
294void rfcomm_transport::rfcomm_connection::listen()
295{
296 boost::asio::async_read(
297 this->sock,
298 boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
299 boost::bind(
300 &rfcomm_transport::rfcomm_connection::async_read_header_handler,
301 this->shared_from_this(),
302 boost::asio::placeholders::error,
303 boost::asio::placeholders::bytes_transferred
304 )
305 );
306}
307
308
309void rfcomm_transport::rfcomm_connection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
310{
311 if (error)
312 {
313 parent->terminate(partner);
314
315 return;
316 }
317
318 // convert byte order
319 header.length = ntohl(header.length);
320 header.length -= 2; // XXX protlib
321
322 assert(header.length > 0);
323
324 // new buffer for the new packet
325 buffy = shared_buffer_t(header.length);
326
327 // * read data *
328 boost::asio::async_read(
329 this->sock,
330 boost::asio::buffer(buffy.mutable_data(), buffy.size()),
331 boost::bind(
332 &rfcomm_transport::rfcomm_connection::async_read_data_handler,
333 this->shared_from_this(),
334 boost::asio::placeholders::error,
335 boost::asio::placeholders::bytes_transferred
336 )
337 );
338}
339
340void rfcomm_transport::rfcomm_connection::async_read_data_handler(
341 const error_code& error, size_t bytes_transferred)
342{
343 if (error)
344 {
345 parent->terminate(partner);
346
347 return;
348 }
349
350 message_t msg;
351 msg.push_back(buffy);
352 buffy = shared_buffer_t();
353
354 if ( parent->listener )
355 parent->listener->receive_message(shared_from_this(), msg);
356
357 listen();
358}
359
360/* see header file for comments */
361void rfcomm_transport::rfcomm_connection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
362{
363 if ( error )
364 {
365 // remove this connection
366 parent->terminate(partner);
367
368 return;
369 }
370
371 send_next_package();
372}
373
374
375
376void rfcomm_transport::rfcomm_connection::enqueue_for_sending(Packet packet, uint8_t priority)
377{
378 bool restart_sending = false;
379
380 // enqueue packet [locked]
381 {
382 unique_lock(out_queues_lock);
383
384 assert( priority < out_queues.size() );
385 out_queues[priority].push(packet);
386
387 if ( ! sending )
388 {
389 restart_sending = true;
390 sending = true;
391 }
392 }
393
394 // if sending was stopped, we have to restart it here
395 if ( restart_sending )
396 {
397 send_next_package();
398 }
399}
400
401/* see header file for comments */
402void rfcomm_transport::rfcomm_connection::send_next_package()
403{
404 Packet packet;
405 bool found = false;
406
407 // find packet with highest priority [locked]
408 {
409 unique_lock(out_queues_lock);
410
411 for ( vector<OutQueue>::iterator it = out_queues.begin();
412 it != out_queues.end(); it++ )
413 {
414 if ( !it->empty() )
415 {
416 packet = it->front();
417 it->pop();
418 found = true;
419
420 break;
421 }
422 }
423
424 // no packets waiting --> stop sending
425 if ( ! found )
426 {
427 sending = false;
428 }
429 }
430
431 // * send *
432 if ( found )
433 {
434 reboost::shared_buffer_t header_buf(sizeof(header_t));
435 header_t* header = (header_t*)(header_buf.mutable_data());
436 header->length = htonl(packet.size()+2); // XXX protlib
437
438 packet.push_front(header_buf);
439
440 // "convert" message to asio buffer sequence
441 vector<boost::asio::const_buffer> send_sequence(packet.length());
442 for ( int i=0; i < packet.length(); i++ )
443 {
444 shared_buffer_t b = packet.at(i);
445 send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
446 }
447
448 // * async write *
449 boost::asio::async_write(
450 this->sock,
451 send_sequence,
452 boost::bind(
453 &rfcomm_transport::rfcomm_connection::async_write_handler,
454 this->shared_from_this(),
455 packet, // makes sure our shared pointer lives long enough ;-)
456 boost::asio::placeholders::error,
457 boost::asio::placeholders::bytes_transferred)
458 );
459 }
460}
461
462}} // namespace ariba::transport
463
464#endif /* HAVE_LIBBLUETOOTH */
Note: See TracBrowser for help on using the repository browser.