#include "rfcomm.hpp" #include "../asio/asio_io_service.h" #include "../asio/rfcomm.hpp" #include "../asio/bluetooth_endpoint.hpp" #include #include #include #include #include namespace ariba { namespace transport { using namespace boost::asio; using namespace detail; using namespace std; using boost::system::error_code; class link_data { public: uint8_t size_[4]; size_t size; uint8_t* buffer; }; class link_info { public: link_info(io_service& io ) : up(false), local(), remote(), socket(io), size(0), buffer(NULL), sending(false) { } // state bool up; rfcomm_endpoint local, remote; bluetooth::rfcomm::socket socket; // read buffer uint8_t size_[4]; size_t size; uint8_t* buffer; // send buffer bool sending; boost::mutex mutex; std::deque send_buffer; }; inline bluetooth::rfcomm::endpoint convert( const rfcomm_endpoint& endpoint ) { return bluetooth::rfcomm::endpoint( endpoint.mac().bluetooth(), endpoint.channel().value() ); } inline rfcomm_endpoint convert( const bluetooth::rfcomm::endpoint& endpoint ) { mac_address mac; mac.bluetooth( endpoint.address() ); rfcomm_channel_address channel; channel.value( endpoint.channel() ); return rfcomm_endpoint( mac, channel ); } rfcomm::rfcomm(uint16_t channel) : channel(channel), io(asio_io_service::alloc()) { } rfcomm::~rfcomm() { asio_io_service::free(); } void rfcomm::start() { // start io service asio_io_service::start(); // create acceptor cout << "Binding to channel " << channel << endl; acceptor = new bluetooth::rfcomm::acceptor(io, bluetooth::rfcomm::endpoint(bluetooth::rfcomm::get(), channel ) ); // start accepting start_accept(); } void rfcomm::stop() { } void rfcomm::send(const address_v* remote, const uint8_t* data, size_t size) { // get end-point rfcomm_endpoint endpoint = *remote; endpoint = convert(convert(endpoint)); // try to find established connector link_info* info = NULL; for (size_t i = 0; i < links.size(); i++) if (links[i]->remote.mac() == endpoint.mac()) { info = links[i]; break; } // not found? ->try to connect if (info==NULL) { cout << "Connecting to " << endpoint.to_string() << endl; info = new link_info(io); info->socket.async_connect( convert(endpoint), boost::bind( &rfcomm::handle_connect, this, boost::asio::placeholders::error, info )); asio_io_service::start(); } // copy message link_data ldata; ldata.size = size; ldata.size_[0] = (size >> 24) & 0xFF; ldata.size_[1] = (size >> 16) & 0xFF; ldata.size_[2] = (size >> 8) & 0xFF; ldata.size_[3] = (size >> 0) & 0xFF; ldata.buffer = new uint8_t[size]; memcpy(ldata.buffer, data, size); // enqueue message info->mutex.lock(); info->send_buffer.push_back(ldata); info->mutex.unlock(); // start writing start_write(info); } void rfcomm::send(const endpoint_set& endpoints, const uint8_t* data, size_t size) { // send a message to each combination of mac-address and channel BOOST_FOREACH( const mac_address mac, endpoints.bluetooth ) { BOOST_FOREACH( const rfcomm_channel_address channel, endpoints.rfcomm ) { rfcomm_endpoint endpoint(mac, channel); address_vf vf = endpoint; send(vf,data,size); } } } void rfcomm::terminate(const address_v* local, const address_v* remote) { // not supported right now! } void rfcomm::register_listener(transport_listener* listener) { this->listener = listener; } void rfcomm::start_accept() { cout << "Waiting for connections ..." << endl; // start accepting a connection link_info* info = new link_info(io); acceptor->async_accept(info->socket, boost::bind( // bind parameters &rfcomm::handle_accept, this, // handler parameters boost::asio::placeholders::error, info )); asio_io_service::start(); } void rfcomm::handle_accept(const error_code& error, link_info* info) { if (error) { cout << "Error accepting" << endl; delete info; return; } // convert endpoints info->up = true; info->local = convert( info->socket.local_endpoint() ); info->remote = convert( info->socket.remote_endpoint() ); cout << "Accepting incoming connection from " << info->remote.to_string() << endl; // add to list links_mutex.lock(); links.push_back(info); links_mutex.unlock(); // start i/o start_read(info); start_write(info); // restart accept start_accept(); } void rfcomm::handle_connect( const error_code& error, link_info* info ) { if (error) { cout << "Error connecting ..." << endl; delete info; return; } // convert endpoints info->up = true; info->local = convert( info->socket.local_endpoint() ); info->remote = convert( info->socket.remote_endpoint() ); cout << "Connected to " << info->remote.to_string() << endl; // add to list links_mutex.lock(); links.push_back(info); links_mutex.unlock(); // start i/o start_read(info); start_write(info); } void rfcomm::start_read(link_info* info) { cout << "Waiting for messages..." << endl; // start read boost::asio::async_read(info->socket, // read size of packet boost::asio::buffer(info->size_, 4), // bind handler boost::bind( // bind parameters &rfcomm::handle_read_header, this, // handler parameters placeholders::error, placeholders::bytes_transferred, info ) ); } void rfcomm::handle_read_header(const error_code& error, size_t bytes, link_info* info) { // ignore errors and wait for all data to be received if (error || bytes != 4) return; // get size info->size = (info->size_[0]<<24) + (info->size_[1] << 16) + (info->size_[2]<< 8) + (info->size_[3] << 0); cout << "receive message of size " << info->size << endl; // allocate buffer info->buffer = new uint8_t[info->size]; // start read boost::asio::async_read(info->socket, // read size of packet boost::asio::buffer(info->buffer, info->size), // bind handler boost::bind( // bind parameters &rfcomm::handle_read_data, this, // handler parameters placeholders::error, placeholders::bytes_transferred, info ) ); } void rfcomm::handle_read_data(const error_code& error, size_t bytes, link_info* info) { // ignore errors and wait for all data to be received if (error || bytes != info->size) return; cout << "received message of size " << info->size << endl; // deliver data listener->receive_message(this, info->local, info->remote, info->buffer, info->size ); // free buffers and reset size buffer delete [] info->buffer; for (size_t i=0; i<4; i++) info->size_[i] = 0; } void rfcomm::start_write( link_info* info ) { // do not start writing if sending is in progress if (info->sending || !info->up || info->send_buffer.size()==0) return; cout << "Sending messages..." << endl; // safely remove data from deque info->mutex.lock(); link_data data = info->send_buffer.front(); info->send_buffer.pop_front(); info->mutex.unlock(); boost::array buffer; buffer[0] = boost::asio::buffer(data.size_,4); buffer[1] = boost::asio::buffer(data.buffer,data.size); // start writing boost::asio::async_write(info->socket, // read size of packet buffer, // bind handler boost::bind( // bind parameters &rfcomm::handle_write_data, this, // handler parameters placeholders::error, placeholders::bytes_transferred, info, data.size, data.buffer ) ); } void rfcomm::handle_write_data(const error_code& error, size_t bytes, link_info* info, size_t size, uint8_t* buffer ) { // ignore errors and wait for all data to be sent if (error || bytes != (size+4) ) { if (error) cout << "Message sent error" << endl; return; } cout << "Message sent" << endl; // free buffer delete [] buffer; // restart-write start_write(info); } }} // namespace ariba::transport