#include "StreamTransport.hpp" // ariba #include "ariba/utility/addressing2/tcpip_endpoint.hpp" // boost-adaption #include "ariba/utility/transport/rfcomm/bluetooth_rfcomm.hpp" // boost #include #include #include // interface discovery for link-local destinations (tcp-only) #include namespace ariba { namespace transport { //use_logging_cpp(StreamTransport) #ifdef HAVE_LIBBLUETOOTH using boost::asio::bluetooth::rfcomm; #endif /* HAVE_LIBBLUETOOTH */ using namespace ariba::addressing2; using ariba::utility::LinkID; using boost::asio::ip::tcp; using boost::asio::ip::address_v6; using boost::shared_ptr; typedef boost::mutex::scoped_lock unique_lock; template StreamTransport::StreamTransport( const typename T::endpoint& endp ) : listener(NULL), acceptor(u_io_service.get_asio_io_service(), endp) { } template StreamTransport::~StreamTransport(){} template void StreamTransport::start() { // open server socket accept(); u_io_service.start(); } template void StreamTransport::stop() { acceptor.close(); u_io_service.stop(); } /* see header file for comments */ template void StreamTransport::send( const typename T::endpoint& dest_addr, reboost::message_t message, uint8_t priority) { ConnPtr conn; bool need_to_connect = false; { unique_lock lock(connections_lock); // try to find existing connection to this endpoint typename ConnectionMap::iterator it = connections.find(dest_addr); // BRANCH: create new connection if (it == connections.end()) { ConnPtr tmp_ptr( new StreamConnection( u_io_service.get_asio_io_service(), this->shared_from_this() ) ); conn = tmp_ptr; // save partner endpoint conn->partner_endpoint = dest_addr; // Note: starting the send is the obligation of the connect_handler // (avoids trying to send while not connected yet) conn->sending = true; need_to_connect = true; typename ConnectionMap::value_type item(dest_addr, conn); connections.insert(item); } // BRANCH: use existing connection else { conn = it->second; } } // * the actual send * conn->enqueue_for_sending(message, priority); // if new connection connect to the other party if ( need_to_connect ) { conn->sock.async_connect( dest_addr, boost::bind( &StreamConnection::async_connect_handler, conn, boost::asio::placeholders::error)); } } // TODO is this a private function? ... and still needed? /* see header file for comments */ template void StreamTransport::send( const EndpointPtr remote, reboost::message_t message, uint8_t priority) { if ( remote->get_category() == endpoint_category::TCPIP ) { const addressing2::tcpip_endpoint* endp = static_cast(remote.get()); send(endp->to_asio(), message, priority); } // else // TODO what now? } template void StreamTransport::register_listener( transport_listener* listener ) { this->listener = listener; } // XXX DEPRECATED template void StreamTransport::terminate( const EndpointPtr remote ) { if ( remote->get_category() == endpoint_category::TCPIP ) { const addressing2::tcpip_endpoint* endp = static_cast(remote.get()); this->terminate(endp->to_asio()); } } template void StreamTransport::terminate( const typename T::endpoint& remote ) { ConnPtr conn; // find and forget connection { unique_lock lock(connections_lock); typename ConnectionMap::iterator it = connections.find(remote); if (it == connections.end()) { return; } conn = it->second; // prevent upper layers from using this link conn->valid = false; connections.erase(it); } // XXX aktuell // cout << "/// MARIO: TCP/IP TERMINATE: " << conn->partner_endpoint << endl; // notify higher layers if ( listener ) { listener->connection_terminated(conn); } // XXX debug aktuell // cout << "/// MARIO Open connections:" << endl; // for ( typename ConnectionMap::iterator it = connections.begin(); it != connections.end(); ++it ) // { // cout << " CONNECTION: " << it->second->local_endpoint << " <---> " << it->second->partner_endpoint << endl; // cout << " used by: " << endl; // // int usecount = 0; // ConnPtr xx; // std::vector links = it->second->get_communication_links(); // for ( std::vector::iterator it2 = links.begin(); it2 != links.end(); ++it2 ) // { // cout << " - " << *it2 << endl; // usecount++; // } // if ( usecount == 0 ) // { // cout << " ---> NOBODY !!" << endl; // } // } // cout << "/// -------" << endl; // close connection boost::system::error_code ec; conn->sock.shutdown(T::socket::shutdown_both, ec); conn->sock.close(ec); } /* private */ template void StreamTransport::accept() { // create new connection object ConnPtr conn( new StreamConnection( u_io_service.get_asio_io_service(), this->shared_from_this() ) ); // wait for incoming connection acceptor.async_accept( conn->sock, boost::bind(&self::async_accept_handler, this->shared_from_this(), conn, boost::asio::placeholders::error) ); } template void StreamTransport::async_accept_handler(ConnPtr conn, const error_code& error) { if ( ! error ) { // save partner endpoint conn->partner_endpoint = conn->sock.remote_endpoint(); { unique_lock lock(connections_lock); typename ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn); connections.insert(item); } // read conn->listen(); } // accept further connections accept(); } /*------------------ | specializations | ------------------*/ /* TCP */ template <> void StreamTransport::send( const addressing2::const_EndpointSetPtr endpoints, reboost::message_t message, uint8_t priority ) { // network interfaces scope_ids, for link-local connections (lazy initialization) vector scope_ids; // send a message to each combination of address-address and port BOOST_FOREACH( const TcpIP_EndpointPtr address, endpoints->get_tcpip_endpoints() ) // vector endpoint_vec = endpoints->get_tcpip_endpoints(); // for ( vector::iterator it = endpoint_vec.begin(); it != endpoint_vec.end(); ++it ) { tcp::endpoint endp = address->to_asio(); // special treatment for link local addresses // ---> send over all (suitable) interfaces if ( endp.address().is_v6() ) { boost::asio::ip::address_v6 v6_addr = endp.address().to_v6(); if ( v6_addr.is_link_local() ) { // initialize scope_ids if ( scope_ids.size() == 0 ) scope_ids = get_interface_scope_ids(); BOOST_FOREACH ( uint64_t id, scope_ids ) { v6_addr.scope_id(id); endp.address(v6_addr); // logging_debug("------> SEND TO (link-local): " << endp); // * send * send(endp, message, priority); } } continue; } // * send * send(endp, message, priority); } } /* RFCOMM */ #ifdef HAVE_LIBBLUETOOTH // TODO // template <> // void StreamTransport::send( // const endpoint_set& endpoints, // reboost::message_t message, // uint8_t priority ) // { // // send a message to each combination of address-address and port // BOOST_FOREACH( const mac_address mac, endpoints.bluetooth ) { // BOOST_FOREACH( const rfcomm_channel_address channel, endpoints.rfcomm ) { // rfcomm::endpoint endp(mac.bluetooth(), channel.value()); // // // * send * // send(endp, message, priority); // } // } // } #endif /* HAVE_LIBBLUETOOTH */ /***************** ** inner class ** *****************/ template StreamTransport::StreamConnection::StreamConnection(boost::asio::io_service & io_service, StreamTransportPtr parent) : sock(io_service), valid(true), parent(parent), out_queues(8), //TODO How much priorities shall we have? sending(false) { header.length = 0; } /*------------------------------------------- | implement transport_connection interface | -------------------------------------------*/ template bool StreamTransport::StreamConnection::send( reboost::message_t message, uint8_t priority) { if ( ! valid ) { // XXX aktuell // cout << "/// MARIO: USED INVALID LINK: " << this->partner_endpoint << endl; return false; } // * enqueue data * enqueue_for_sending(message, priority); return true; } template EndpointPtr StreamTransport::StreamConnection::getLocalEndpoint() { EndpointPtr ret(new addressing2::tcpip_endpoint(local_endpoint)); return ret; } template EndpointPtr StreamTransport::StreamConnection::getRemoteEndpoint() { EndpointPtr ret(new addressing2::tcpip_endpoint(partner_endpoint)); return ret; } template void StreamTransport::StreamConnection::register_communication_link(LinkID* link) { if ( ! link ) return; // add link communication_links.push_back(link); } template void StreamTransport::StreamConnection::unregister_communication_link(LinkID* link) { if ( ! link ) return; // remove link { std::vector::iterator it = communication_links.begin(); while ( it != communication_links.end() ) { if ( (*it) == link ) { it = communication_links.erase(it); } else { ++it; } } } // this connection is no longer used by any link if ( communication_links.empty() ) { //XXX // cout << "communication_links.empty() " << getLocalEndpoint()->to_string() << " - " << getRemoteEndpoint()->to_string() << endl; // terminate connection this->terminate(); // TODO aktuell /* * TODO racecondition: * When receiving a link request, the connection could closed * before the request is handled. * * ---> Maybe wait a timeout before actually terminate the connection. * (e.g. record last-used time: * if last used > timeout and communication_links.empty() * then terminate the connection) */ } } template std::vector StreamTransport::StreamConnection::get_communication_links() { return communication_links; } template void StreamTransport::StreamConnection::terminate() { parent->terminate(partner_endpoint); } /*------------------------------ | things we defined ourselves | ------------------------------*/ template void StreamTransport::StreamConnection::async_connect_handler(const error_code& error) { if (error) { parent->terminate(partner_endpoint); return; } // save local endpoint local_endpoint = sock.local_endpoint(); // Note: sending has to be true at this point send_next_package(); listen(); } template void StreamTransport::StreamConnection::listen() { boost::asio::async_read( this->sock, boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)), boost::bind( &StreamTransport::StreamConnection::async_read_header_handler, this->shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) ); } template void StreamTransport::StreamConnection::async_read_header_handler(const error_code& error, size_t bytes_transferred) { if (error) { parent->terminate(partner_endpoint); return; } // sanity checks // NOTE: max size 8k (may be changed later..) if ( header.length == 0 || header.length > 8192 ) { parent->terminate(partner_endpoint); } // new buffer for the new packet buffy = shared_buffer_t(header.length); // * read data * boost::asio::async_read( this->sock, boost::asio::buffer(buffy.mutable_data(), buffy.size()), boost::bind( &StreamTransport::StreamConnection::async_read_data_handler, this->shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) ); } template void StreamTransport::StreamConnection::async_read_data_handler( const error_code& error, size_t bytes_transferred) { if (error) { parent->terminate(partner_endpoint); return; } if ( parent->listener ) parent->listener->receive_message(this->shared_from_this(), buffy); buffy = shared_buffer_t(); listen(); } /* see header file for comments */ template void StreamTransport::StreamConnection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred) { if ( error ) { // remove this connection parent->terminate(partner_endpoint); return; } send_next_package(); } template void StreamTransport::StreamConnection::enqueue_for_sending(Packet packet, uint8_t priority) { bool restart_sending = false; // debugging --> copy message (instead of zero copy) // reboost::shared_buffer_t buff = packet.linearize(); // reboost::message_t msg; // msg.push_back(buff); // assert ( msg.MAGIC_NUMBER == 421337 ); // [ debugging ] // enqueue packet [locked] { unique_lock lock(out_queues_lock); assert ( this->valid ); assert( priority < out_queues.size() ); // * enqueue * out_queues[priority].push(packet); if ( ! sending ) { restart_sending = true; sending = true; } } // if sending was stopped, we have to restart it here if ( restart_sending ) { send_next_package(); } } /* see header file for comments */ template void StreamTransport::StreamConnection::send_next_package() { Packet packet; bool found = false; // I'm not sure if this can actually happen.. But let's be on the save side, here. if ( ! this->valid ) { this->sending = false; return; } // find packet with highest priority [locked] { unique_lock lock(out_queues_lock); for ( vector::iterator it = out_queues.begin(); it != out_queues.end(); it++ ) { if ( !it->empty() ) { packet = it->front(); it->pop(); found = true; break; } } // no packets waiting --> stop sending if ( ! found ) { sending = false; } } // * send * if ( found ) { reboost::shared_buffer_t header_buf(sizeof(header_t)); header_t* header = (header_t*)(header_buf.mutable_data()); header->length = packet.size(); packet.push_front(header_buf); // "convert" message to asio buffer sequence vector send_sequence(packet.length()); for ( int i=0; i < packet.length(); i++ ) { shared_buffer_t b = packet.at(i); send_sequence.push_back(boost::asio::buffer(b.data(), b.size())); } // * async write * boost::asio::async_write( this->sock, send_sequence, boost::bind( &StreamTransport::StreamConnection::async_write_handler, this->shared_from_this(), packet, // makes sure our shared pointer lives long enough ;-) boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ); } } /********************* ** [ inner class ] ** *********************/ // explicitly tell the compiler to create a »tcp« (and »rfcomm«) specialization // --> (needed since hpp and cpp are separated) template class StreamTransport; #ifdef HAVE_LIBBLUETOOTH template class StreamTransport; #endif /* HAVE_LIBBLUETOOTH */ ///////////////////////////////////////////////////////////////////////////////////// // XXX testing ///** // * Conversion between ASIO Adresses and ARIBA adresses // */ ///* TCP */ //template <> //inline typename tcp::endpoint convert_address( const address_v* address ) //{ // tcpip_endpoint endpoint = *address; // // return typename tcp::endpoint( // endpoint.address().asio(), endpoint.port().value() // ); //} // //template <> //inline EndpointPtr convert_address(const typename tcp::endpoint& endpoint) //{ // ip_address address; // address.asio(endpoint.address()); // tcp_port_address port; // port.value(endpoint.port()); // //// new tcpip_endpoint(address, port); // tcpip_endpoint xx; // address_vf yy; // address_v* zz = yy->clone(); // address_v::shared_ptr endp(zz); // XXX // // return endp; //} // ///* RFCOMM */ //#ifdef HAVE_LIBBLUETOOTH // template <> // inline typename rfcomm::endpoint convert_address( const address_v* address ) // { // rfcomm_endpoint endpoint = *address; // // return rfcomm::endpoint( // endpoint.mac().bluetooth(), endpoint.channel().value() // ); // } // // template <> // inline address_v::shared_ptr convert_address(const typename rfcomm::endpoint& endpoint) // { // mac_address mac; // mac.bluetooth(endpoint.address()); // rfcomm_channel_address channel; // channel.value(endpoint.channel()); // // address_v::shared_ptr endp((ariba::addressing::address_v*) new rfcomm_endpoint(mac, channel)); // // return endp; // } //#endif /* HAVE_LIBBLUETOOTH */ ///////////////////////////////////////////////////////////////////////////////////// /* * Get Ethernet scope ids (for link-local) */ vector get_interface_scope_ids() { vector ret; struct ifaddrs* ifaceBuffer = NULL; void* tmpAddrPtr = NULL; int ok = getifaddrs( &ifaceBuffer ); if( ok != 0 ) return ret; for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) { // ignore devices that are disabled or have no ip if(i == NULL) continue; struct sockaddr* addr = i->ifa_addr; if (addr==NULL) continue; // only use ethX and wlanX devices string device = string(i->ifa_name); if ( (device.find("eth") == string::npos) && (device.find("wlan") == string::npos) /* && (device.find("lo") == string::npos) XXX */ ) { continue; } // only use interfaces with ipv6 link-local addresses if (addr->sa_family == AF_INET6) { // convert address // TODO should be possible without detour over strings char straddr[INET6_ADDRSTRLEN]; tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr; inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); address_v6 v6addr = address_v6::from_string(straddr); if ( v6addr.is_link_local() ) { // * append the scope_id to the return vector * ret.push_back(if_nametoindex(i->ifa_name)); } } } freeifaddrs(ifaceBuffer); return ret; } }} // namespace ariba::transport