Index: source/ariba/utility/transport/StreamTransport/CMakeLists.txt
===================================================================
--- source/ariba/utility/transport/tcpip/CMakeLists.txt	(revision 10700)
+++ source/ariba/utility/transport/StreamTransport/CMakeLists.txt	(revision 12060)
@@ -37,5 +37,6 @@
 # [License]
 
-add_headers(tcpip.hpp)
+add_headers(StreamTransport.hpp)
 
-add_sources(tcpip.cpp)
+add_sources(StreamTransport.cpp)
+
Index: source/ariba/utility/transport/StreamTransport/StreamTransport.cpp
===================================================================
--- source/ariba/utility/transport/StreamTransport/StreamTransport.cpp	(revision 12060)
+++ source/ariba/utility/transport/StreamTransport/StreamTransport.cpp	(revision 12060)
@@ -0,0 +1,814 @@
+#include "StreamTransport.hpp"
+
+// ariba
+#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
+
+// boost-adaption
+#include "ariba/utility/transport/rfcomm/bluetooth_rfcomm.hpp"
+
+// boost
+#include <boost/foreach.hpp>
+#include <boost/array.hpp>
+#include <boost/asio/ip/address.hpp>
+
+// interface discovery for link-local destinations (tcp-only)
+#include <ifaddrs.h>
+
+
+namespace ariba {
+namespace transport {
+
+//use_logging_cpp(StreamTransport<T>)
+
+#ifdef HAVE_LIBBLUETOOTH
+    using boost::asio::bluetooth::rfcomm;
+#endif /* HAVE_LIBBLUETOOTH */
+
+using namespace ariba::addressing2;
+using ariba::utility::LinkID;
+
+using boost::asio::ip::tcp;
+using boost::asio::ip::address_v6;
+using boost::shared_ptr;
+
+typedef boost::mutex::scoped_lock unique_lock;
+
+
+template <class T>
+StreamTransport<T>::StreamTransport( const typename T::endpoint& endp )  :
+        listener(NULL),
+        acceptor(u_io_service.get_asio_io_service(), endp)
+{
+}
+
+template <class T>
+StreamTransport<T>::~StreamTransport(){}
+
+template <class T>
+void StreamTransport<T>::start()
+{
+    // open server socket
+    accept();
+    
+    u_io_service.start();
+}
+
+
+template <class T>
+void StreamTransport<T>::stop()
+{
+    acceptor.close();
+    
+    u_io_service.stop();
+}
+
+
+/* see header file for comments */
+template <class T>
+void StreamTransport<T>::send(
+        const typename T::endpoint& dest_addr,
+        reboost::message_t message,
+        uint8_t priority)
+{
+    ConnPtr conn;
+    bool need_to_connect = false;
+    
+    {
+        unique_lock lock(connections_lock);
+        
+        // try to find existing connection to this endpoint 
+        typename ConnectionMap::iterator it = connections.find(dest_addr);
+        
+        // BRANCH: create new connection
+        if (it == connections.end())
+        {
+            ConnPtr tmp_ptr(
+                    new StreamConnection(
+                            u_io_service.get_asio_io_service(),
+                            this->shared_from_this() )
+                    );
+            conn = tmp_ptr;
+
+            // save partner endpoint
+            conn->partner_endpoint = dest_addr;
+            
+            // Note: starting the send is the obligation of the connect_handler
+            // (avoids trying to send while not connected yet)
+            conn->sending = true;
+            need_to_connect = true;
+            
+            typename ConnectionMap::value_type item(dest_addr, conn);
+            connections.insert(item);
+            
+        }
+        // BRANCH: use existing connection
+        else
+        {
+            conn = it->second;
+        }
+    }
+    
+    
+    // * the actual send *
+    conn->enqueue_for_sending(message, priority);
+    
+    // if new connection connect to the other party
+    if ( need_to_connect )
+    {
+        conn->sock.async_connect(
+                dest_addr,
+                boost::bind(
+                        &StreamConnection::async_connect_handler,
+                        conn,
+                        boost::asio::placeholders::error));
+    }
+}
+
+
+// TODO is this a private function? ... and still needed?
+/* see header file for comments */
+template <class T>
+void StreamTransport<T>::send(
+        const EndpointPtr remote,
+        reboost::message_t message,
+        uint8_t priority)
+{
+    if ( remote->get_category() == endpoint_category::TCPIP )
+    {
+        const addressing2::tcpip_endpoint* endp =
+                static_cast<const addressing2::tcpip_endpoint*>(remote.get());
+        
+        send(endp->to_asio(), message, priority);
+    }
+    
+    // else
+    // TODO what now?
+}
+
+
+template <class T>
+void StreamTransport<T>::register_listener( transport_listener* listener )
+{
+    this->listener = listener;
+}
+
+
+// XXX DEPRECATED
+template <class T>
+void StreamTransport<T>::terminate( const EndpointPtr remote )
+{
+    if ( remote->get_category() == endpoint_category::TCPIP )
+    {
+        const addressing2::tcpip_endpoint* endp =
+                static_cast<const addressing2::tcpip_endpoint*>(remote.get());
+
+        
+        this->terminate(endp->to_asio());
+    }
+}
+
+template <class T>
+void StreamTransport<T>::terminate( const typename T::endpoint& remote )
+{
+    ConnPtr conn;
+    
+    // find and forget connection
+    {
+        unique_lock lock(connections_lock);
+        
+        typename ConnectionMap::iterator it = connections.find(remote);
+        if (it == connections.end())
+        {
+            return;
+        }
+        
+        conn = it->second;
+
+        // prevent upper layers from using this link
+        conn->valid = false;
+
+        connections.erase(it);
+    }
+    
+    // XXX aktuell
+//     cout << "/// MARIO: TCP/IP TERMINATE: " << conn->partner_endpoint << endl;
+    
+    // notify higher layers
+    if ( listener )
+    {
+        listener->connection_terminated(conn);
+    }
+
+    
+    // XXX debug aktuell
+//    cout << "/// MARIO Open connections:" << endl;
+//    for ( typename ConnectionMap::iterator it = connections.begin(); it != connections.end(); ++it )
+//    {
+//        cout << "  CONNECTION: " << it->second->local_endpoint << " <---> " << it->second->partner_endpoint << endl;
+//        cout << "    used by: " << endl;
+//        
+//        int usecount = 0;
+//        ConnPtr xx;
+//        std::vector<LinkID*> links = it->second->get_communication_links();
+//        for ( std::vector<LinkID*>::iterator it2 = links.begin(); it2 != links.end(); ++it2 )
+//        {
+//            cout << "      - " << *it2 << endl;
+//            usecount++;
+//        }
+//        if ( usecount == 0 )
+//        {
+//            cout << "      ---> NOBODY !!" << endl;
+//        }
+//    }
+//    cout << "/// -------" << endl;
+
+
+    // close connection
+    boost::system::error_code ec;
+    conn->sock.shutdown(T::socket::shutdown_both, ec);
+    conn->sock.close(ec);
+}
+
+
+/* private */
+template <class T>
+void StreamTransport<T>::accept()
+{
+    // create new connection object
+    ConnPtr conn(
+            new StreamConnection(
+                    u_io_service.get_asio_io_service(),
+                    this->shared_from_this()
+            )
+    );
+    
+    // wait for incoming connection
+    acceptor.async_accept(
+            conn->sock,
+            boost::bind(&self::async_accept_handler,
+                    this->shared_from_this(),
+                    conn,
+                    boost::asio::placeholders::error)
+    );
+}
+
+template <class T>
+void StreamTransport<T>::async_accept_handler(ConnPtr conn, const error_code& error)
+{
+    if ( ! error )
+    {
+        // save partner endpoint
+        conn->partner_endpoint = conn->sock.remote_endpoint();
+        
+        {
+            unique_lock lock(connections_lock);
+            
+            typename ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
+            connections.insert(item);
+        }
+        
+        // read
+        conn->listen();
+    }
+    
+    // accept further connections
+    accept();
+}
+
+
+
+/*------------------
+ | specializations |
+ ------------------*/
+/* TCP */
+template <>
+void StreamTransport<tcp>::send(
+        const addressing2::const_EndpointSetPtr endpoints,
+        reboost::message_t message,
+        uint8_t priority )
+{
+    // network interfaces scope_ids, for link-local connections (lazy initialization)
+    vector<uint64_t> scope_ids;
+    
+    // send a message to each combination of address-address and port
+    BOOST_FOREACH( const TcpIP_EndpointPtr address, endpoints->get_tcpip_endpoints() )
+//    vector<TcpIP_EndpointPtr> endpoint_vec = endpoints->get_tcpip_endpoints();
+//    for ( vector<TcpIP_EndpointPtr>::iterator it = endpoint_vec.begin(); it != endpoint_vec.end(); ++it )
+    {
+        tcp::endpoint endp = address->to_asio();
+        
+        // special treatment for link local addresses
+        //   ---> send over all (suitable) interfaces
+        if ( endp.address().is_v6() )
+        {
+            boost::asio::ip::address_v6 v6_addr = endp.address().to_v6();
+            
+            if ( v6_addr.is_link_local() )
+            {
+                // initialize scope_ids
+                if ( scope_ids.size() == 0 )
+                    scope_ids = get_interface_scope_ids();
+                
+                BOOST_FOREACH ( uint64_t id, scope_ids )
+                {                        
+                    v6_addr.scope_id(id);
+                    endp.address(v6_addr);
+
+//                    logging_debug("------> SEND TO (link-local): " << endp);
+                    // * send *
+                    send(endp, message, priority);
+                }
+            }
+            
+            continue;
+        }
+        
+        // * send *
+        send(endp, message, priority);
+    }
+}
+
+
+/* RFCOMM */
+#ifdef HAVE_LIBBLUETOOTH
+
+// TODO
+
+//    template <>
+//    void StreamTransport<rfcomm>::send(
+//            const endpoint_set& endpoints,
+//            reboost::message_t message,
+//            uint8_t priority )
+//    {
+//        // send a message to each combination of address-address and port
+//        BOOST_FOREACH( const mac_address mac, endpoints.bluetooth ) {
+//            BOOST_FOREACH( const rfcomm_channel_address channel, endpoints.rfcomm ) {
+//                rfcomm::endpoint endp(mac.bluetooth(), channel.value());
+//                
+//                // * send *
+//                send(endp, message, priority);
+//            }
+//        }
+//    }
+
+#endif /* HAVE_LIBBLUETOOTH */
+
+
+
+/*****************
+ ** inner class **
+ *****************/
+
+template <class T>
+StreamTransport<T>::StreamConnection::StreamConnection(boost::asio::io_service & io_service, StreamTransportPtr parent)  :
+        sock(io_service),
+        valid(true),
+        parent(parent),
+        out_queues(8), //TODO How much priorities shall we have?
+        sending(false)
+{
+        header.length = 0;
+}
+
+/*-------------------------------------------
+ | implement transport_connection interface |
+ -------------------------------------------*/
+template <class T>
+bool StreamTransport<T>::StreamConnection::send(
+        reboost::message_t message,
+        uint8_t priority)
+{
+    if ( ! valid )
+    {
+        // XXX aktuell
+//         cout << "/// MARIO: USED INVALID LINK: " << this->partner_endpoint << endl;
+
+        return false;
+    }
+    
+    // * enqueue data *
+    enqueue_for_sending(message, priority);
+    
+    return true;
+}
+
+
+template <class T>
+EndpointPtr StreamTransport<T>::StreamConnection::getLocalEndpoint()
+{
+    EndpointPtr ret(new addressing2::tcpip_endpoint(local_endpoint));
+    
+    return ret;
+}
+
+
+template <class T>
+EndpointPtr StreamTransport<T>::StreamConnection::getRemoteEndpoint()
+{
+    EndpointPtr ret(new addressing2::tcpip_endpoint(partner_endpoint));
+    
+    return ret;
+}
+
+template <class T>
+void StreamTransport<T>::StreamConnection::register_communication_link(LinkID* link)
+{
+    if ( ! link )
+        return;
+    
+    // add link
+    communication_links.push_back(link);
+}
+
+template <class T>
+void StreamTransport<T>::StreamConnection::unregister_communication_link(LinkID* link)
+{
+    if ( ! link )
+        return;
+
+    
+    // remove link
+    {
+        std::vector<LinkID*>::iterator it = communication_links.begin();
+        
+        while ( it != communication_links.end() )
+        {
+            if ( (*it) == link )
+            {
+                it = communication_links.erase(it);
+            }
+            else
+            {
+                ++it;
+            }
+        }
+    }
+    
+    // this connection is no longer used by any link
+    if ( communication_links.empty() )
+    {
+        //XXX
+//         cout << "communication_links.empty() " << getLocalEndpoint()->to_string() << " - " << getRemoteEndpoint()->to_string() << endl;
+        
+        // terminate connection
+        this->terminate();  // TODO aktuell
+        
+        /*
+         *  TODO racecondition:
+         *  When receiving a link request, the connection could closed
+         *  before the request is handled.
+         *  
+         *  ---> Maybe wait a timeout before actually terminate the connection.
+         *  (e.g. record last-used time:
+         *      if last used > timeout and communication_links.empty()
+         *      then terminate the connection)  
+         */
+    }
+}
+
+template <class T>
+std::vector<LinkID*> StreamTransport<T>::StreamConnection::get_communication_links()
+{
+    return communication_links;
+}
+
+
+template <class T>
+void StreamTransport<T>::StreamConnection::terminate()
+{
+    parent->terminate(partner_endpoint);
+}
+
+
+/*------------------------------
+ | things we defined ourselves |
+ ------------------------------*/
+template <class T>
+void StreamTransport<T>::StreamConnection::async_connect_handler(const error_code& error)
+{
+    if (error)
+    {
+        parent->terminate(partner_endpoint);
+
+        return;
+    }
+    
+    // save local endpoint
+    local_endpoint = sock.local_endpoint();
+    
+    // Note: sending has to be true at this point
+    send_next_package();
+    
+    listen();
+}
+
+
+template <class T>
+void StreamTransport<T>::StreamConnection::listen()
+{
+    boost::asio::async_read(
+            this->sock,
+            boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
+            boost::bind(
+                    &StreamTransport<T>::StreamConnection::async_read_header_handler,
+                    this->shared_from_this(),
+                    boost::asio::placeholders::error,
+                    boost::asio::placeholders::bytes_transferred
+            )
+    );
+}
+
+
+template <class T>
+void StreamTransport<T>::StreamConnection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
+{
+    if (error)
+    {
+        parent->terminate(partner_endpoint);
+
+        return;
+    }
+    
+    // sanity checks
+    //   NOTE: max size 8k (may be changed later..)
+    if ( header.length == 0 || header.length > 8192 )
+    {
+        parent->terminate(partner_endpoint);
+    }
+
+    
+    // new buffer for the new packet
+    buffy = shared_buffer_t(header.length);
+
+    // * read data *
+    boost::asio::async_read(
+            this->sock,
+            boost::asio::buffer(buffy.mutable_data(), buffy.size()),
+            boost::bind(
+                    &StreamTransport<T>::StreamConnection::async_read_data_handler,
+                    this->shared_from_this(),
+                    boost::asio::placeholders::error,
+                    boost::asio::placeholders::bytes_transferred
+            )
+    );
+}
+
+template <class T>
+void StreamTransport<T>::StreamConnection::async_read_data_handler(
+        const error_code& error, size_t bytes_transferred)
+{
+    if (error)
+    {
+        parent->terminate(partner_endpoint);
+
+        return;
+    }
+    
+    if ( parent->listener )
+        parent->listener->receive_message(this->shared_from_this(), buffy);
+
+    buffy = shared_buffer_t();
+    listen();
+}
+
+/* see header file for comments */
+template <class T>
+void StreamTransport<T>::StreamConnection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
+{
+    if ( error )
+    {        
+        // remove this connection
+        parent->terminate(partner_endpoint); 
+
+        return;
+    }
+    
+    send_next_package();
+}
+
+
+
+template <class T>
+void StreamTransport<T>::StreamConnection::enqueue_for_sending(Packet packet, uint8_t priority)
+{
+    bool restart_sending = false;
+    
+    // enqueue packet  [locked]
+    {
+        unique_lock(out_queues_lock);
+        
+        assert( priority < out_queues.size() );
+        out_queues[priority].push(packet);
+        
+        if ( ! sending )
+        {
+            restart_sending = true;
+            sending = true;
+        }
+    }
+    
+    // if sending was stopped, we have to restart it here
+    if ( restart_sending )
+    {
+        send_next_package();
+    }
+}
+
+/* see header file for comments */
+template <class T>
+void StreamTransport<T>::StreamConnection::send_next_package()
+{
+    Packet packet;
+    bool found = false;
+
+    // find packet with highest priority  [locked]
+    {
+        unique_lock(out_queues_lock);
+        
+        for ( vector<OutQueue>::iterator it = out_queues.begin();
+                it != out_queues.end(); it++ )
+        {
+            if ( !it->empty() )
+            {
+                packet = it->front();
+                it->pop();
+                found = true;
+                
+                break;
+            }
+        }
+        
+        // no packets waiting --> stop sending
+        if ( ! found )
+        {
+            sending = false;
+        }
+    }
+    
+    // * send *
+    if ( found )
+    {
+        reboost::shared_buffer_t header_buf(sizeof(header_t));
+        header_t* header = (header_t*)(header_buf.mutable_data());
+        header->length = packet.size();
+        
+        packet.push_front(header_buf);
+        
+        // "convert" message to asio buffer sequence
+        vector<boost::asio::const_buffer> send_sequence(packet.length());
+        for ( int i=0; i < packet.length(); i++ )
+        {
+            shared_buffer_t b = packet.at(i);
+            send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
+        }
+        
+        // * async write *
+        boost::asio::async_write(
+                this->sock,
+                send_sequence,
+                boost::bind(
+                        &StreamTransport<T>::StreamConnection::async_write_handler,
+                        this->shared_from_this(),
+                        packet,  // makes sure our shared pointer lives long enough ;-)
+                        boost::asio::placeholders::error,
+                        boost::asio::placeholders::bytes_transferred)
+        );
+    }
+}
+/*********************
+ ** [ inner class ] **
+ *********************/
+
+
+
+// explicitly tell the compiler to create a Â»tcpÂ« (and Â»rfcommÂ«) specialization
+//   --> (needed since hpp and cpp are separated) 
+template class StreamTransport<boost::asio::ip::tcp>;
+
+#ifdef HAVE_LIBBLUETOOTH
+    template class StreamTransport<rfcomm>;
+#endif /* HAVE_LIBBLUETOOTH */
+
+
+
+/////////////////////////////////////////////////////////////////////////////////////
+    
+
+// XXX testing
+///** 
+// * Conversion between ASIO Adresses and ARIBA adresses
+// */
+///* TCP */
+//template <>
+//inline typename tcp::endpoint convert_address<tcp>( const address_v* address )
+//{
+//    tcpip_endpoint endpoint = *address;
+//    
+//    return typename tcp::endpoint(
+//        endpoint.address().asio(), endpoint.port().value()
+//    );
+//}
+//
+//template <>
+//inline EndpointPtr convert_address<tcp>(const typename tcp::endpoint& endpoint)
+//{
+//    ip_address address;
+//    address.asio(endpoint.address());
+//    tcp_port_address port;
+//    port.value(endpoint.port());
+//    
+////    new tcpip_endpoint(address, port);
+//    tcpip_endpoint xx;
+//    address_vf yy;
+//    address_v* zz = yy->clone();
+//    address_v::shared_ptr endp(zz); // XXX
+//    
+//    return endp; 
+//}
+//
+///* RFCOMM */
+//#ifdef HAVE_LIBBLUETOOTH
+//    template <>
+//    inline typename rfcomm::endpoint convert_address<rfcomm>( const address_v* address )
+//    {
+//        rfcomm_endpoint endpoint = *address;
+//        
+//        return rfcomm::endpoint(
+//            endpoint.mac().bluetooth(), endpoint.channel().value()
+//        );
+//    }
+//    
+//    template <>
+//    inline address_v::shared_ptr convert_address<rfcomm>(const typename rfcomm::endpoint& endpoint)
+//    {
+//        mac_address mac;
+//        mac.bluetooth(endpoint.address());
+//        rfcomm_channel_address channel;
+//        channel.value(endpoint.channel());
+//        
+//        address_v::shared_ptr endp((ariba::addressing::address_v*) new rfcomm_endpoint(mac, channel));
+//        
+//        return endp; 
+//    }
+//#endif /* HAVE_LIBBLUETOOTH */
+
+
+    
+/////////////////////////////////////////////////////////////////////////////////////
+    
+
+/*
+ *  Get Ethernet scope ids (for link-local)
+ */
+vector<uint64_t> get_interface_scope_ids()
+{
+    vector<uint64_t> ret;
+    
+    struct ifaddrs* ifaceBuffer = NULL;
+    void*           tmpAddrPtr  = NULL;
+    
+    int ok = getifaddrs( &ifaceBuffer );
+    if( ok != 0 ) return ret;
+
+    for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
+
+        // ignore devices that are disabled or have no ip
+        if(i == NULL) continue;
+        struct sockaddr* addr = i->ifa_addr;
+        if (addr==NULL) continue;
+
+        // only use ethX and wlanX devices
+        string device = string(i->ifa_name);
+        if ( (device.find("eth") == string::npos) &&
+              (device.find("wlan")  == string::npos) /* &&
+              (device.find("lo")  == string::npos) XXX */ )
+        {
+            continue;
+        }
+
+        // only use interfaces with ipv6 link-local addresses 
+        if (addr->sa_family == AF_INET6)
+        {
+            // convert address
+            // TODO should be possible without detour over strings
+            char straddr[INET6_ADDRSTRLEN];
+            tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
+            inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
+
+            address_v6 v6addr = address_v6::from_string(straddr);
+            if ( v6addr.is_link_local() )
+            {
+                // * append the scope_id to the return vector *
+                ret.push_back(if_nametoindex(i->ifa_name));
+            }
+
+        }
+    }
+
+    freeifaddrs(ifaceBuffer);
+    
+    return ret;
+}
+
+
+}} // namespace ariba::transport
Index: source/ariba/utility/transport/StreamTransport/StreamTransport.hpp
===================================================================
--- source/ariba/utility/transport/StreamTransport/StreamTransport.hpp	(revision 12060)
+++ source/ariba/utility/transport/StreamTransport/StreamTransport.hpp	(revision 12060)
@@ -0,0 +1,227 @@
+#ifndef STREAM_TRANSPORT_HPP_
+#define STREAM_TRANSPORT_HPP_
+
+// ariba
+#include "ariba/utility/transport/asio/unique_io_service.h"
+#include "ariba/utility/transport/messages/buffers.hpp"
+//#include "ariba/utility/logging/Logging.h"
+#include "ariba/utility/addressing2/endpoint.hpp"
+
+// ariba (transport) interfaces
+#include "ariba/utility/transport/interfaces/transport_connection.hpp"
+#include "ariba/utility/transport/interfaces/transport_protocol.hpp"
+#include "ariba/utility/transport/interfaces/transport_listener.hpp"
+
+// system
+#include <queue>
+
+// boost
+#include <boost/asio.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+
+
+namespace ariba {
+namespace transport {
+
+using namespace std;
+using boost::shared_ptr;
+using ariba::transport::detail::unique_io_service;
+using boost::system::error_code;
+using reboost::shared_buffer_t;
+using reboost::message_t;
+
+template <class T>
+class StreamTransport :
+    public transport_protocol,
+    public boost::enable_shared_from_this<StreamTransport<T> >
+{
+    typedef StreamTransport<T> self;
+    typedef shared_ptr<self> StreamTransportPtr;
+//use_logging_h(StreamTransport<T>)
+
+    
+    
+    
+private:
+    
+/*****************
+ ** inner class **
+ *****************/
+    class StreamConnection :
+        public transport_connection,
+        public boost::enable_shared_from_this<StreamConnection>
+    {
+    public:
+        typedef reboost::message_t Packet;
+        typedef std::queue<Packet> OutQueue;
+        
+        struct header_t
+        {
+            uint16_t length;
+        } __attribute__((packed));
+            
+        StreamConnection(boost::asio::io_service& io_service, StreamTransportPtr parent);
+        
+        virtual ~StreamConnection() {}
+        
+        /// Inherited from transport_connection
+        // Thread: ARIBA
+        virtual bool send(reboost::message_t message, uint8_t priority = 0);
+        // Thread: ARIBA
+        virtual ariba::addressing2::EndpointPtr getLocalEndpoint();
+        // Thread: ARIBA
+        virtual ariba::addressing2::EndpointPtr getRemoteEndpoint();
+        // Thread: ARIBA
+        virtual void register_communication_link(ariba::utility::LinkID* link);
+        // Thread: ARIBA
+        virtual void unregister_communication_link(ariba::utility::LinkID* link);
+        // Thread: ARIBA
+        virtual std::vector<ariba::utility::LinkID*> get_communication_links();
+        
+        // Thread: BOTH
+        virtual void terminate();
+        
+        void listen();
+        
+        void async_connect_handler(const error_code& error);
+        
+        void async_read_header_handler(const error_code& error, size_t bytes_transferred);
+        void async_read_data_handler(const error_code& error, size_t bytes_transferred);
+        
+        /*
+         * is called from asio when write operation "returns",
+         * calls private function `send_next_package()`
+         */
+        void async_write_handler(
+                reboost::shared_buffer_t packet,
+                const error_code& error,
+                size_t bytes_transferred);
+
+        
+        void enqueue_for_sending(Packet packet, uint8_t priority);
+        
+    private:
+        /*
+         * is called from `send` or `async_write_handler` to begin/keep sending
+         * sends the next message with the highest priority in this connection
+         */
+        void send_next_package();
+
+    public:
+        typename T::socket sock;
+        bool valid;
+        StreamTransportPtr parent;
+        
+        typename T::endpoint partner_endpoint;
+        typename T::endpoint local_endpoint;
+//        address_v* remote;
+//        address_v* local;
+        
+        vector<OutQueue> out_queues;     // to be locked with out_queues_lock 
+        boost::mutex out_queues_lock;
+        
+        bool sending;       // to be locked with out_queues_lock
+        
+        header_t header;
+        shared_buffer_t buffy;
+        
+    private:
+        std::vector<ariba::utility::LinkID*> communication_links;
+    };
+    typedef boost::shared_ptr<StreamConnection> ConnPtr;
+    typedef std::map<typename T::endpoint, ConnPtr> ConnectionMap;
+/*********************
+ ** [ inner class ] **
+ *********************/
+
+    
+    
+    
+public:
+	StreamTransport( const typename T::endpoint& endp );
+	virtual ~StreamTransport();
+	virtual void start();
+	virtual void stop();
+	
+	/**
+     * enqueues message for sending
+     * create new connection if necessary
+     * starts sending mechanism (if not already running)
+     */
+    void send(
+            const typename T::endpoint&,
+            reboost::message_t message,
+            uint8_t priority = 0 );
+	
+	/**
+	 * Converts address_v to tcp::endpoint and calls the real send() function
+	 */
+	virtual void send(
+	        const addressing2::EndpointPtr remote,
+	        reboost::message_t message,
+	        uint8_t priority = 0 );
+	
+	/**
+	 * calls send for each destination endpoint in `endpoints` 
+	 */
+	virtual void send(
+	        const addressing2::const_EndpointSetPtr endpoints,
+	        reboost::message_t message,
+	        uint8_t priority = 0 );
+	
+	// XXX DEPRECATED
+	virtual void terminate( addressing2::EndpointPtr remote );
+	
+	virtual void terminate( const typename T::endpoint& remote );
+	virtual void register_listener( transport_listener* listener );
+
+private:
+	void accept();
+	void async_accept_handler(ConnPtr conn, const error_code& error);
+	
+private:
+	transport_listener* listener;
+	unique_io_service u_io_service;
+	typename T::acceptor acceptor;
+	
+	ConnectionMap connections;
+	boost::mutex connections_lock;
+};
+
+
+// aliases
+//typedef StreamTransport<boost::asio::ip::tcp> tcpip;
+//#ifdef HAVE_LIBBLUETOOTH
+//    typedef StreamTransport<boost::asio::bluetooth::rfcomm> rfcomm_transport;
+//#endif /* HAVE_LIBBLUETOOTH */
+
+
+// XXX testing TODO natÃŒrlich brauchen wir das noch... oO
+///** 
+// * Conversion between ASIO Adresses and ARIBA adresses
+// */
+//template <class T>
+//typename T::endpoint convert_address(const address_v* endpoint);
+//
+//template <class T>
+//address_v::shared_ptr convert_address(const typename T::endpoint& endpoint);
+
+
+
+/**
+ *  returns a vector of (interesting) network interfaces
+ *  
+ *  [NOTE: The current implementation returns the scope_ids of
+ *  all ethX and wlanX network interfaces, to be used for
+ *  connections to link-local ipv6 addresses.]
+ *  
+ *  TODO move to ariba/communication/networkinfo/AddressDiscovery ??
+ *  
+ */
+vector<uint64_t> get_interface_scope_ids();
+
+
+}} // namespace ariba::transport
+
+#endif /* STREAM_TRANSPORT_HPP_ */
Index: source/ariba/utility/transport/StreamTransport/tcpip.cpp
===================================================================
--- source/ariba/utility/transport/tcpip/tcpip.cpp	(revision 10700)
+++ 	(revision )
@@ -1,540 +1,0 @@
-#include "tcpip.hpp"
-
-#include <boost/array.hpp>
-
-// interface discovery for link-local destinations
-#include <ifaddrs.h>
-
-namespace ariba {
-namespace transport {
-
-use_logging_cpp(tcpip)
-
-using namespace ariba::addressing;
-
-typedef boost::mutex::scoped_lock unique_lock;
-
-tcpip::tcpip( const tcp::endpoint& endp )  :
-        listener(NULL),
-        acceptor(u_io_service.get_asio_io_service(), endp)
-{
-}
-
-tcpip::~tcpip(){}
-
-void tcpip::start()
-{
-    // open server socket
-    accept();
-    
-    u_io_service.start();
-}
-
-
-void tcpip::stop()
-{
-    acceptor.close();
-    
-    u_io_service.stop();
-}
-
-
-/* see header file for comments */
-void tcpip::send(
-        const tcp::endpoint& dest_addr,
-        reboost::message_t message,
-        uint8_t priority)
-{
-    ConnPtr conn;
-    bool need_to_connect = false;
-    
-    {
-        unique_lock lock(connections_lock);
-        
-        ConnectionMap::iterator it = connections.find(dest_addr);
-        if (it == connections.end())
-        {
-            ConnPtr tmp_ptr(
-                    new tcpip_connection(
-                            u_io_service.get_asio_io_service(),
-                            shared_from_this() )
-                    );
-            conn = tmp_ptr;
-            
-            conn->partner = dest_addr;
-            conn->remote = convert_address(dest_addr);
-            
-            // Note: starting the send is the obligation of the connect_handler
-            // (avoids trying to send while not connected yet)
-            conn->sending =  true;
-            need_to_connect = true;
-            
-            ConnectionMap::value_type item(dest_addr, conn);
-            connections.insert(item);
-            
-        } else {
-            conn = it->second;
-        }
-    }
-    
-    
-    // * the actual send *
-    conn->enqueue_for_sending(message, priority);
-    
-    // if new connection connect to the other party
-    if ( need_to_connect )
-    {
-        conn->sock.async_connect(
-                dest_addr,
-                boost::bind(
-                        &tcpip_connection::async_connect_handler,
-                        conn,
-                        boost::asio::placeholders::error));
-    }
-}
-
-
-/* see header file for comments */
-void tcpip::send(
-        const address_v* remote,
-        reboost::message_t message,
-        uint8_t priority)
-{
-    send(convert_address(remote), message, priority);
-}
-
-
-/* see header file for comments */
-void tcpip::send(
-        const endpoint_set& endpoints,
-        reboost::message_t message,
-        uint8_t priority )
-{
-    // network interfaces scope_ids, for link-local connections (lazy initialization)
-    vector<uint64_t> scope_ids;
-    
-    // send a message to each combination of address-address and port
-    BOOST_FOREACH( const ip_address address, endpoints.ip ) {
-        BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
-            tcp::endpoint endp(address.asio(), port.asio());
-            
-            // special treatment for link local addresses
-            //   ---> send over all (suitable) interfaces
-            if ( endp.address().is_v6() )
-            {
-                boost::asio::ip::address_v6 v6_addr = endp.address().to_v6();
-                
-                if ( v6_addr.is_link_local() )
-                {
-                    // initialize scope_ids
-                    if ( scope_ids.size() == 0 )
-                        scope_ids = get_interface_scope_ids();
-                    
-                    BOOST_FOREACH ( uint64_t id, scope_ids )
-                    {                        
-                        v6_addr.scope_id(id);
-                        endp.address(v6_addr);
-    
-                        logging_debug("------> SEND TO (link-local): " << endp);
-                        // * send *
-                        send(endp, message, priority);
-                    }
-                }
-                
-                continue;
-            }
-            
-            // * send *
-            send(endp, message, priority);
-        }
-    }
-}
-
-
-void tcpip::register_listener( transport_listener* listener )
-{
-    this->listener = listener;
-}
-
-
-void tcpip::terminate( const address_v* remote )
-{
-    terminate(convert_address(remote));
-}
-
-void tcpip::terminate( const tcp::endpoint& remote )
-{
-    ConnPtr conn;
-    
-    // find and forget connection
-    {
-        unique_lock lock(connections_lock);
-        
-        ConnectionMap::iterator it = connections.find(remote);
-        if (it == connections.end())
-        {
-            return;
-        }
-        
-        conn = it->second;
-        
-        connections.erase(it);
-    }
-
-    // close connection
-    boost::system::error_code ec;
-    conn->sock.shutdown(tcp::socket::shutdown_both, ec);
-    conn->sock.close(ec);
-}
-
-
-/* private */
-void tcpip::accept()
-{
-    // create new connection object
-    ConnPtr conn(
-            new tcpip_connection(
-                    u_io_service.get_asio_io_service(),
-                    shared_from_this()
-            )
-    );
-    
-    // wait for incoming connection
-    acceptor.async_accept(
-            conn->sock,
-            boost::bind(&self::async_accept_handler,
-                    this->shared_from_this(),
-                    conn,
-                    boost::asio::placeholders::error)
-    );
-}
-
-void tcpip::async_accept_handler(ConnPtr conn, const error_code& error)
-{
-    if ( ! error )
-    {
-        conn->partner = conn->sock.remote_endpoint();
-        conn->remote = convert_address(conn->partner);
-        conn->local = convert_address(conn->sock.local_endpoint());
-        
-        {
-            unique_lock lock(connections_lock);
-            
-            ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
-            connections.insert(item);
-        }
-        
-        // read
-        conn->listen();
-    }
-    
-    // accept further connections
-    accept();
-}
-
-inline tcp::endpoint tcpip::convert_address( const address_v* address )
-{
-    tcpip_endpoint endpoint = *address;
-    
-    return tcp::endpoint(
-        endpoint.address().asio(), endpoint.port().value()
-    );
-}
-
-
-inline tcpip_endpoint tcpip::convert_address(const tcp::endpoint& endpoint)
-{
-    ip_address address;
-    address.asio(endpoint.address());
-    tcp_port_address port;
-    port.value(endpoint.port());
-    return tcpip_endpoint(address, port);
-}
-
-
-vector<uint64_t> tcpip::get_interface_scope_ids()
-{
-    vector<uint64_t> ret;
-    
-    struct ifaddrs* ifaceBuffer = NULL;
-    void*           tmpAddrPtr  = NULL;
-    
-    int ok = getifaddrs( &ifaceBuffer );
-    if( ok != 0 ) return ret;
-
-    for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
-
-        // ignore devices that are disabled or have no ip
-        if(i == NULL) continue;
-        struct sockaddr* addr = i->ifa_addr;
-        if (addr==NULL) continue;
-
-        // only use ethX and wlanX devices
-        string device = string(i->ifa_name);
-        if ( (device.find("eth") == string::npos) &&
-              (device.find("wlan")  == string::npos) /* &&
-              (device.find("lo")  == string::npos) XXX */ )
-        {
-            continue;
-        }
-
-        // only use interfaces with ipv6 link-local addresses 
-        if (addr->sa_family == AF_INET6)
-        {
-            // convert address
-            // TODO should be possible without detour over strings
-            char straddr[INET6_ADDRSTRLEN];
-            tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
-            inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
-
-            address_v6 v6addr = address_v6::from_string(straddr);
-            if ( v6addr.is_link_local() )
-            {
-                // * append the scope_id to the return vector *
-                ret.push_back(if_nametoindex(i->ifa_name));
-            }
-
-        }
-    }
-
-    freeifaddrs(ifaceBuffer);
-    
-    return ret;
-}
-
-
-/*****************
- ** inner class **
- *****************/
-
-tcpip::tcpip_connection::tcpip_connection(boost::asio::io_service & io_service, TcpIpPtr parent)  :
-        sock(io_service),
-        valid(true),
-        parent(parent),
-        out_queues(8), //TODO How much priorities shall we have?
-        sending(false)
-{
-        header.length = 0;
-        header.prot = 0;
-}
-
-/*-------------------------------------------
- | implement transport_connection interface |
- -------------------------------------------*/
-void tcpip::tcpip_connection::send(
-        reboost::message_t message,
-        uint8_t priority)
-{
-    enqueue_for_sending(message, priority);
-}
-
-
-address_vf tcpip::tcpip_connection::getLocalEndpoint()
-{
-    return local;
-}
-
-
-address_vf tcpip::tcpip_connection::getRemoteEndpoint()
-{
-    return remote;
-}
-
-
-void tcpip::tcpip_connection::terminate()
-{
-    parent->terminate(partner);
-}
-
-
-/*------------------------------
- | things we defined ourselves |
- ------------------------------*/
-void tcpip::tcpip_connection::async_connect_handler(const error_code& error)
-{
-    if (error)
-    {
-        parent->terminate(partner);
-
-        return;
-    }
-    
-    // save address in ariba format
-    local = parent->convert_address(sock.local_endpoint());
-    
-    // Note: sending has to be true at this point
-    send_next_package();
-    
-    listen();
-}
-
-
-void tcpip::tcpip_connection::listen()
-{
-    boost::asio::async_read(
-            this->sock,
-            boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
-            boost::bind(
-                    &tcpip::tcpip_connection::async_read_header_handler,
-                    this->shared_from_this(),
-                    boost::asio::placeholders::error,
-                    boost::asio::placeholders::bytes_transferred
-            )
-    );
-}
-
-
-void tcpip::tcpip_connection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
-{
-    if (error)
-    {
-        parent->terminate(partner);
-
-        return;
-    }
-
-    // convert byte order
-    header.length = ntohl(header.length);
-    header.length -= 2;  // XXX protlib
-    
-    assert(header.length > 0);
-    
-    // new buffer for the new packet
-    buffy = shared_buffer_t(header.length);
-
-    // * read data *
-    boost::asio::async_read(
-            this->sock,
-            boost::asio::buffer(buffy.mutable_data(), buffy.size()),
-            boost::bind(
-                    &tcpip::tcpip_connection::async_read_data_handler,
-                    this->shared_from_this(),
-                    boost::asio::placeholders::error,
-                    boost::asio::placeholders::bytes_transferred
-            )
-    );
-}
-
-void tcpip::tcpip_connection::async_read_data_handler(
-        const error_code& error, size_t bytes_transferred)
-{
-    if (error)
-    {
-        parent->terminate(partner);
-
-        return;
-    }
-    
-    message_t msg;
-    msg.push_back(buffy);
-    buffy = shared_buffer_t();
-
-    if ( parent->listener )
-        parent->listener->receive_message(shared_from_this(), msg);
-    
-    listen();
-}
-
-/* see header file for comments */
-void tcpip::tcpip_connection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
-{
-    if ( error )
-    {        
-        // remove this connection
-        parent->terminate(partner); 
-
-        return;
-    }
-    
-    send_next_package();
-}
-
-
-
-void tcpip::tcpip_connection::enqueue_for_sending(Packet packet, uint8_t priority)
-{
-    bool restart_sending = false;
-    
-    // enqueue packet  [locked]
-    {
-        unique_lock(out_queues_lock);
-        
-        assert( priority < out_queues.size() );
-        out_queues[priority].push(packet);
-        
-        if ( ! sending )
-        {
-            restart_sending = true;
-            sending = true;
-        }
-    }
-    
-    // if sending was stopped, we have to restart it here
-    if ( restart_sending )
-    {
-        send_next_package();
-    }
-}
-
-/* see header file for comments */
-void tcpip::tcpip_connection::send_next_package()
-{
-    Packet packet;
-    bool found = false;
-
-    // find packet with highest priority  [locked]
-    {
-        unique_lock(out_queues_lock);
-        
-        for ( vector<OutQueue>::iterator it = out_queues.begin();
-                it != out_queues.end(); it++ )
-        {
-            if ( !it->empty() )
-            {
-                packet = it->front();
-                it->pop();
-                found = true;
-                
-                break;
-            }
-        }
-        
-        // no packets waiting --> stop sending
-        if ( ! found )
-        {
-            sending = false;
-        }
-    }
-    
-    // * send *
-    if ( found )
-    {
-        reboost::shared_buffer_t header_buf(sizeof(header_t));
-        header_t* header = (header_t*)(header_buf.mutable_data());
-        header->length = htonl(packet.size()+2);  // XXX protlib
-        
-        packet.push_front(header_buf);
-        
-        // "convert" message to asio buffer sequence
-        vector<boost::asio::const_buffer> send_sequence(packet.length());
-        for ( int i=0; i < packet.length(); i++ )
-        {
-            shared_buffer_t b = packet.at(i);
-            send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
-        }
-        
-        // * async write *
-        boost::asio::async_write(
-                this->sock,
-                send_sequence,
-                boost::bind(
-                        &tcpip::tcpip_connection::async_write_handler,
-                        this->shared_from_this(),
-                        packet,  // makes sure our shared pointer lives long enough ;-)
-                        boost::asio::placeholders::error,
-                        boost::asio::placeholders::bytes_transferred)
-        );
-    }
-}
-
-}} // namespace ariba::transport
Index: source/ariba/utility/transport/StreamTransport/tcpip.hpp
===================================================================
--- source/ariba/utility/transport/tcpip/tcpip.hpp	(revision 10700)
+++ 	(revision )
@@ -1,173 +1,0 @@
-#ifndef TCPIP_HPP_
-#define TCPIP_HPP_
-
-#include "ariba/utility/transport/transport.hpp"
-#include "ariba/utility/transport/asio/unique_io_service.h"
-#include "ariba/utility/transport/transport_connection.hpp"
-#include "ariba/utility/addressing/tcpip_endpoint.hpp"
-#include <boost/asio.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <queue>
-#include "ariba/utility/transport/messages/buffers.hpp"
-#include "ariba/utility/logging/Logging.h"
-
-namespace ariba {
-namespace transport {
-
-using namespace std;
-using ariba::transport::detail::unique_io_service;
-using ariba::addressing::tcpip_endpoint;
-using boost::asio::ip::tcp;
-using boost::asio::ip::address_v6;
-using boost::system::error_code;
-using reboost::shared_buffer_t;
-using reboost::message_t;
-
-class tcpip;
-typedef boost::shared_ptr<tcpip> TcpIpPtr;
-
-class tcpip :
-    public transport_protocol,
-    public boost::enable_shared_from_this<tcpip>
-{
-    typedef tcpip self;
-use_logging_h(tcpip)
-
-private:
-    class tcpip_connection :
-        public transport_connection,
-        public boost::enable_shared_from_this<tcpip_connection>
-    {
-    public:
-        typedef reboost::message_t Packet;
-        typedef std::queue<Packet> OutQueue;
-        
-        struct header_t
-        {
-            uint32_t length;
-            uint16_t prot;  // XXX protlib
-        } __attribute__((packed));
-            
-        tcpip_connection(boost::asio::io_service& io_service, TcpIpPtr parent);
-        
-        /// Inherited from transport_connection
-        virtual void send(reboost::message_t message, uint8_t priority = 0);
-        virtual address_vf getLocalEndpoint();
-        virtual address_vf getRemoteEndpoint();
-        virtual void terminate();
-        
-        void listen();
-        
-        void async_connect_handler(const error_code& error);
-        
-        void async_read_header_handler(const error_code& error, size_t bytes_transferred);
-        void async_read_data_handler(const error_code& error, size_t bytes_transferred);
-        
-        /*
-         * is called from asio when write operation "returns",
-         * calls private function `send_next_package()`
-         */
-        void async_write_handler(
-                reboost::shared_buffer_t packet,
-                const error_code& error,
-                size_t bytes_transferred);
-
-        
-        void enqueue_for_sending(Packet packet, uint8_t priority);
-        
-    private:
-        /*
-         * is called from `send` or `async_write_handler` to begin/keep sending
-         * sends the next message with the highest priority in this connection
-         */
-        void send_next_package();
-
-
-    public:
-        tcp::socket sock;
-        bool valid;
-        TcpIpPtr parent;
-        
-        tcp::endpoint partner;
-        tcpip_endpoint remote;
-        tcpip_endpoint local;
-        
-        vector<OutQueue> out_queues;     // to be locked with out_queues_lock 
-        boost::mutex out_queues_lock;
-        
-        bool sending;       // to be locked with out_queues_lock
-        
-        header_t header;
-        shared_buffer_t buffy;
-    };
-    typedef boost::shared_ptr<tcpip_connection> ConnPtr;
-    typedef std::map<tcp::endpoint, ConnPtr> ConnectionMap;
-    
-public:
-	tcpip( const tcp::endpoint& endp );
-	virtual ~tcpip();
-	virtual void start();
-	virtual void stop();
-	
-	/**
-     * enqueues message for sending
-     * create new connection if necessary
-     * starts sending mechanism (if not already running)
-     */
-    void send(
-            const tcp::endpoint&,
-            reboost::message_t message,
-            uint8_t priority = 0 );
-	
-	/**
-	 * Converts address_v to tcp::endpoint and calls the real send() function
-	 */
-	virtual void send(
-	        const address_v* remote,
-	        reboost::message_t message,
-	        uint8_t priority = 0 );
-	
-	/**
-	 * calls send for each destination endpoint in `endpoint_set& endpoints` 
-	 */
-	virtual void send(
-	        const endpoint_set& endpoints,
-	        reboost::message_t message,
-	        uint8_t priority = 0 );
-	
-	virtual void terminate( const address_v* remote );
-	virtual void terminate( const tcp::endpoint& remote );
-	virtual void register_listener( transport_listener* listener );
-
-	
-    /**
-     *  returns a vector of (interesting) network interfaces
-     *  
-     *  [NOTE: The current implementation returns the scope_ids of
-     *  all ethX and wlanX network interfaces, to be used for
-     *  connections to link-local ipv6 addresses.]
-     *  
-     *  TODO move to ariba/communication/networkinfo/AddressDiscovery ??
-     *  
-     */
-    static vector<uint64_t> get_interface_scope_ids();
-
-private:
-	void accept();
-	void async_accept_handler(ConnPtr conn, const error_code& error);
-	tcp::endpoint convert_address(const address_v* endpoint);
-	tcpip_endpoint convert_address(const tcp::endpoint& endpoint);
-	
-private:
-	transport_listener* listener;
-	unique_io_service u_io_service;
-	tcp::acceptor acceptor;
-	
-	ConnectionMap connections;
-	boost::mutex connections_lock;
-};
-
-}} // namespace ariba::transport
-
-#endif /* TCPIP_HPP_ */
