An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/transport/rfcomm/rfcomm.cpp @ 5993

Last change on this file since 5993 was 5993, checked in by Christoph Mayer, 14 years ago

sigcomm release

File size: 9.8 KB
Line 
1#include "rfcomm.hpp"
2
3#include "ariba/utility/transport/asio/asio_io_service.h"
4#include "ariba/utility/transport/asio/rfcomm.hpp"
5#include "ariba/utility/transport/asio/bluetooth_endpoint.hpp"
6
7#include <boost/asio.hpp>
8#include <boost/thread.hpp>
9#include <boost/array.hpp>
10#include <memory.h>
11#include <deque>
12#include <cerrno>
13
14namespace ariba {
15namespace transport {
16
17use_logging_cpp(rfcomm)
18
19using namespace boost::asio;
20using namespace detail;
21using namespace std;
22
23using boost::system::error_code;
24
25class link_data {
26public:
27        uint8_t size_[4];
28        size_t size;
29        uint8_t* buffer;
30};
31
32class link_info {
33public:
34        link_info(io_service& io ) :
35                io(io), up(false), connecting(false), local(), remote(), socket(new bluetooth::rfcomm::socket(io)), connect_retries(0),
36                size(0), buffer(NULL), sending(false) {
37        }
38
39        ~link_info() {
40                delete socket;
41        }
42
43        void reinit() {
44                delete socket;
45                socket = new bluetooth::rfcomm::socket(io);
46                up = false;
47        }
48
49        io_service& io;
50
51        // state
52        bool up;
53        bool connecting;
54        rfcomm_endpoint local, remote;
55        bluetooth::rfcomm::socket* socket;
56        int connect_retries;
57
58        // read buffer
59        uint8_t size_[4];
60        size_t size;
61        uint8_t* buffer;
62
63        // send buffer
64        bool sending;
65        boost::mutex mutex;
66        std::deque<link_data> send_buffer;
67};
68
69void rfcomm::shutdown(link_info* info) {
70        if (info != NULL && info->up) {
71                info->up = false;
72                info->socket->shutdown( bluetooth::rfcomm::socket::shutdown_both );
73        }
74}
75
76
77inline bluetooth::rfcomm::endpoint convert( const rfcomm_endpoint& endpoint ) {
78        return bluetooth::rfcomm::endpoint(
79                endpoint.mac().bluetooth(), endpoint.channel().value()
80        );
81}
82
83inline rfcomm_endpoint convert( const bluetooth::rfcomm::endpoint& endpoint ) {
84        mac_address mac;
85        mac.bluetooth( endpoint.address() );
86        rfcomm_channel_address channel;
87        channel.value( endpoint.channel() );
88        return rfcomm_endpoint( mac, channel );
89}
90
91
92rfcomm::rfcomm(uint16_t channel) :
93        channel(channel), io(asio_io_service::alloc()) {
94        accept_retries = 0;
95}
96
97rfcomm::~rfcomm() {
98        asio_io_service::free();
99}
100
101void rfcomm::start() {
102
103        // start io service
104        asio_io_service::start();
105
106        // create acceptor
107        logging_info( "Binding to channel " << channel );
108        acceptor = new bluetooth::rfcomm::acceptor(io,
109                bluetooth::rfcomm::endpoint(bluetooth::rfcomm::get(), channel )
110        );
111
112        send_data = new link_data();
113
114        // start accepting
115        start_accept();
116}
117
118void rfcomm::stop() {
119        logging_info( "Stopping asio rfcomm" );
120}
121
122void rfcomm::send(const address_v* remote, const uint8_t* data, size_t size) {
123
124        // get end-point
125        rfcomm_endpoint endpoint = *remote;
126        endpoint = convert(convert(endpoint));
127
128        // try to find established connector
129        logging_debug("Trying to find a already existing link.");
130        link_info* info = NULL;
131        for (size_t i = 0; i < links.size(); i++)
132                if (links[i]->remote.mac() == endpoint.mac()) {
133                        logging_debug("Using already established link");
134                        info = links[i];
135                        break;
136                }
137
138        // not found, or not up? ->try to (re-)connect
139        if (info==NULL || ((!info->up || !info->socket->is_open()) && !info->connecting) ) {
140                logging_debug( "Connecting to " << endpoint.to_string() );
141                if (info != NULL && (!info->up || !info->socket->is_open())) {
142                        logging_error("Old link is down. Trying to re-establish link.");
143                        info->reinit();
144                } else {
145                        info = new link_info(io);
146                }
147                info->connect_retries = 0;
148                info->remote = endpoint;
149                info->connecting = true;
150                info->socket->async_connect( convert(endpoint), boost::bind(
151                        &rfcomm::handle_connect, this,
152                        boost::asio::placeholders::error, info
153                ));
154                links.push_back(info);
155        }
156
157        // copy message
158        link_data ldata;
159        ldata.size = size;
160        ldata.size_[0] = (size >> 24) & 0xFF;
161        ldata.size_[1] = (size >> 16) & 0xFF;
162        ldata.size_[2] = (size >> 8) & 0xFF;
163        ldata.size_[3] = (size >> 0) & 0xFF;
164        ldata.buffer = new uint8_t[size];
165        memcpy(ldata.buffer, data, size);
166
167        // enqueue message
168        info->mutex.lock();
169        info->send_buffer.push_back(ldata);
170        info->mutex.unlock();
171
172        // start writing
173        io.post( boost::bind( &rfcomm::start_write, this, info) );
174}
175
176void rfcomm::send(const endpoint_set& endpoints, const uint8_t* data, size_t size) {
177        // send a message to each combination of mac-address and channel
178        BOOST_FOREACH( const mac_address mac, endpoints.bluetooth ) {
179                BOOST_FOREACH( const rfcomm_channel_address channel, endpoints.rfcomm ) {
180                        rfcomm_endpoint endpoint(mac, channel);
181                        address_vf vf = endpoint;
182                        send(vf,data,size);
183                }
184        }
185}
186
187void rfcomm::terminate( const address_v* remote) {
188        // get end-point
189        rfcomm_endpoint endpoint = *remote;
190
191        for (size_t i = 0; i < links.size(); i++)
192                if (links[i]->remote.mac() == endpoint.mac()) {
193                        // close socket
194                        shutdown(links[i]);
195                        break;
196                }
197}
198
199void rfcomm::register_listener(transport_listener* listener) {
200        this->listener = listener;
201}
202
203void rfcomm::start_accept() {
204
205        logging_info( "Waiting for connections ..." );
206
207        // start accepting a connection
208        link_info* info = new link_info(io);
209        acceptor->async_accept(*info->socket, boost::bind(
210                // bind parameters
211                &rfcomm::handle_accept, this,
212
213                // handler parameters
214                boost::asio::placeholders::error, info
215        ));
216        asio_io_service::start();
217}
218
219void rfcomm::handle_accept(const error_code& error, link_info* info) {
220        if (error) {
221                logging_error( "Error waiting for new connections. Error code: "<< error.message()
222                                << ", trying to recover (attempt " << accept_retries << ")");
223
224                // restart accepting
225                if (accept_retries<3) {
226                        accept_retries++;
227                        start_accept();
228                } else
229                        delete info;
230
231                return;
232        }
233
234        links_mutex.lock();
235
236        // convert endpoints
237        info->up = true;
238        info->local  = convert( info->socket->local_endpoint()  );
239        info->remote = convert( info->socket->remote_endpoint() );
240
241        logging_debug("Accepted incoming connection from "
242                << info->remote.to_string() );
243
244        // add to list
245        links.push_back(info);
246        links_mutex.unlock();
247
248        // start i/o
249        start_read(info);
250        start_write(info);
251
252        // restart accept
253        start_accept();
254}
255
256void rfcomm::handle_connect( const error_code& error, link_info* info ) {
257        if (error) {
258                logging_error( "Can not connect. Error code: "
259                                << error.message() << " Retrying ... "
260                                "(attempt " << info->connect_retries << ")" );
261
262                // do we retry this connection? yes->
263                if (info->connect_retries<3) {
264                        // increase counter
265                        info->connecting = false;
266                        info->connect_retries++;
267                        info->reinit();
268
269                        // retry connection attempt
270                        info->socket->async_connect( convert(info->remote), boost::bind(
271                                &rfcomm::handle_connect, this,
272                                boost::asio::placeholders::error, info
273                        ));
274
275                } else { // no-> delete link and stop
276                        return;
277                }
278        }
279
280        // convert endpoints
281        info->up = true;
282        info->connecting = false;
283        info->local  = convert( info->socket->local_endpoint()  );
284        info->remote = convert( info->socket->remote_endpoint() );
285
286        logging_debug( "Connected to " << info->remote.to_string() );
287
288        // add to list
289        links_mutex.lock();
290        links.push_back(info);
291        links_mutex.unlock();
292
293        // start i/o
294        start_read(info);
295        start_write(info);
296}
297
298void rfcomm::start_read(link_info* info) {
299        logging_debug("Start reading ...");
300
301        // start read
302        boost::asio::async_read(*info->socket,
303
304                // read size of packet
305                boost::asio::buffer(info->size_, 4),
306
307                // bind handler
308                boost::bind(
309
310                        // bind parameters
311                        &rfcomm::handle_read_header, this,
312
313                        // handler parameters
314                        placeholders::error, placeholders::bytes_transferred, info
315                )
316        );
317}
318
319void rfcomm::handle_read_header(const error_code& error, size_t bytes,
320        link_info* info) {
321
322        // handle error
323        if (error) {
324                logging_error("Failed to receive message payload. Error code: "
325                                << error.message() );
326                shutdown(info);
327                return;
328        }
329
330        // ignore errors and wait for all data to be received
331        if (bytes != 4) return;
332
333        // get size
334        info->size = (info->size_[0]<<24) + (info->size_[1] << 16) +
335                        (info->size_[2]<< 8) + (info->size_[3] << 0);
336
337        // allocate buffer
338        info->buffer = new uint8_t[info->size];
339
340        // start read
341        boost::asio::async_read(*info->socket,
342                // read size of packet
343                boost::asio::buffer(info->buffer, info->size),
344                // bind handler
345                boost::bind(
346                        // bind parameters
347                        &rfcomm::handle_read_data, this,
348                        // handler parameters
349                        placeholders::error, placeholders::bytes_transferred, info
350                )
351        );
352}
353
354void rfcomm::handle_read_data(const error_code& error, size_t bytes,
355        link_info* info) {
356
357        // check error
358        if (error) {
359                logging_error("Failed to receive message payload. Error: " << error.message() );
360                shutdown(info);
361                return;
362        }
363
364        // wait for all data to be received
365        if (bytes != info->size)
366                return;
367
368        // deliver data
369        listener->receive_message(this, info->local, info->remote, info->buffer, info->size );
370
371        // free buffers and reset size buffer
372        delete [] info->buffer;
373        for (size_t i=0; i<4; i++) info->size_[i] = 0;
374
375        start_read(info);
376}
377
378void rfcomm::start_write( link_info* info ) {
379        // do not start writing if sending is in progress
380        if (info->sending || !info->up || info->send_buffer.size()==0) return;
381
382        // set sending flag
383        info->sending = true;
384
385        // safely remove data from deque
386        *send_data = info->send_buffer.front();
387        info->send_buffer.pop_front();
388
389        boost::array<boost::asio::mutable_buffer, 2> buffer;
390        buffer[0] = boost::asio::buffer(send_data->size_,4);
391        buffer[1] = boost::asio::buffer(send_data->buffer,send_data->size);
392
393        // start writing
394        boost::asio::async_write(*info->socket,
395                // read size of packet
396                buffer,
397                // bind handler
398                boost::bind(
399                        // bind parameters
400                        &rfcomm::handle_write_data, this,
401                        // handler parameters
402                        placeholders::error, placeholders::bytes_transferred,
403                        info, send_data->size, send_data->buffer
404                )
405        );
406}
407
408void rfcomm::handle_write_data(const error_code& error, size_t bytes,
409        link_info* info, size_t size, uint8_t* buffer ) {
410
411        // handle error
412        if (error) {
413                logging_error( "Message sent error. Error: " << error.message() );
414                info->sending = false;
415                shutdown(info);
416                return;
417        }
418
419        //  wait for all data to be sent
420        if (bytes != (size+4) )
421                return;
422
423        logging_debug( "Message sent" );
424
425        // free buffer
426        delete [] buffer;
427        info->sending = false;
428
429        // restart-write
430        start_write(info);
431}
432
433}} // namespace ariba::transport
Note: See TracBrowser for help on using the repository browser.