Changeset 10653 for source


Ignore:
Timestamp:
Jul 25, 2012, 11:41:36 AM (13 years ago)
Author:
Michael Tänzer
Message:

Merge the ASIO branch back into trunk

Location:
source
Files:
27 added
9 deleted
30 edited

Legend:

Unmodified
Added
Removed
  • source/Makefile.am

    r10652 r10653  
    1 SUBDIRS = ariba
     1SUBDIRS = ariba services
  • source/ariba/Makefile.am

    r7744 r10653  
    1818# project version number!!
    1919
    20 libariba_la_LDFLAGS = -version-info 0:0:0
     20libariba_la_LDFLAGS = -version-info 1:0:0
    2121
    2222# compiler flags ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     
    144144  overlay/messages/JoinReply.cpp \
    145145  overlay/messages/JoinRequest.cpp \
    146   overlay/messages/DHTMessage.cpp \
    147146  overlay/messages/OverlayMsg.cpp
    148147
     
    150149  overlay/messages/JoinReply.h \
    151150  overlay/messages/JoinRequest.h \
    152   overlay/messages/DHTMessage.h\
    153151  overlay/messages/OverlayMsg.h
    154152
     
    278276libariba_la_SOURCES += \
    279277  utility/transport/tcpip/tcpip.cpp \
    280   utility/transport/tcpip/protlib/timer_module.cpp \
    281   utility/transport/tcpip/protlib/setuid.cpp \
    282   utility/transport/tcpip/protlib/queuemanager.cpp \
    283   utility/transport/tcpip/protlib/messages.cpp \
    284   utility/transport/tcpip/protlib/fqueue.cpp \
    285   utility/transport/tcpip/protlib/fastqueue.c \
    286   utility/transport/tcpip/protlib/eclock_gettime.c \
    287   utility/transport/tcpip/protlib/tp_over_udp.cpp \
    288   utility/transport/tcpip/protlib/connectionmap_uds.cpp \
    289   utility/transport/tcpip/protlib/network_message.cpp \
    290   utility/transport/tcpip/protlib/threadsafe_db.cpp \
    291   utility/transport/tcpip/protlib/timer.cpp \
    292   utility/transport/tcpip/protlib/address.cpp \
    293   utility/transport/tcpip/protlib/connectionmap.cpp \
    294   utility/transport/tcpip/protlib/tp.cpp \
    295   utility/transport/tcpip/protlib/tp_over_tcp.cpp \
    296   utility/transport/tcpip/protlib/configuration.cpp \
    297   utility/transport/tcpip/protlib/ie.cpp \
    298   utility/transport/tcpip/protlib/threads.cpp \
    299   utility/transport/tcpip/protlib/logfile.cpp \
    300278  utility/transport/transport_peer.cpp \
    301   utility/transport/rfcomm/rfcomm.cpp \
    302   utility/transport/asio/asio_io_service.cpp
     279  utility/transport/rfcomm/rfcomm_transport.cpp \
     280  utility/transport/asio/unique_io_service.cpp \
     281  utility/transport/messages/buffer.cpp \
     282  utility/transport/messages/message.cpp \
     283  utility/transport/messages/shared_buffer.cpp
    303284
    304285nobase_libariba_la_HEADERS += \
    305286  utility/transport/test_transport.hpp \
    306287  utility/transport/tcpip/tcpip.hpp \
     288  utility/transport/transport_connection.hpp \
     289  utility/transport/transport_listener.hpp \
    307290  utility/transport/transport_peer.hpp \
    308291  utility/transport/transport_protocol.hpp \
    309   utility/transport/rfcomm/rfcomm.hpp \
     292  utility/transport/rfcomm/rfcomm_transport.hpp \
     293  utility/transport/rfcomm/bluetooth_endpoint.hpp \
     294  utility/transport/rfcomm/bluetooth_rfcomm.hpp \
    310295  utility/transport/transport.hpp \
    311   utility/transport/asio/bluetooth_endpoint.hpp \
    312   utility/transport/asio/rfcomm.hpp \
    313   utility/transport/transport_listener.hpp \
    314   utility/transport/asio/asio_io_service.h \
    315   utility/transport/tcpip/protlib/threadsafe_db.h \
    316   utility/transport/tcpip/protlib/configuration.h \
    317   utility/transport/tcpip/protlib/ie.h \
    318   utility/transport/tcpip/protlib/llhashers.h \
    319   utility/transport/tcpip/protlib/fqueue.h \
    320   utility/transport/tcpip/protlib/assocdata_uds.h \
    321   utility/transport/tcpip/protlib/address.h \
    322   utility/transport/tcpip/protlib/logfile.h \
    323   utility/transport/tcpip/protlib/timer.h \
    324   utility/transport/tcpip/protlib/queuemanager.h \
    325   utility/transport/tcpip/protlib/messages.h \
    326   utility/transport/tcpip/protlib/assocdata.h \
    327   utility/transport/tcpip/protlib/protlib_types.h \
    328   utility/transport/tcpip/protlib/tp_over_tcp.h \
    329   utility/transport/tcpip/protlib/tp_over_udp.h \
    330   utility/transport/tcpip/protlib/tp.h \
    331   utility/transport/tcpip/protlib/threads.h \
    332   utility/transport/tcpip/protlib/connectionmap.h \
    333   utility/transport/tcpip/protlib/timer_module.h \
    334   utility/transport/tcpip/protlib/fastqueue.h \
    335   utility/transport/tcpip/protlib/tperror.h \
    336   utility/transport/tcpip/protlib/network_message.h \
    337   utility/transport/tcpip/protlib/setuid.h \
    338   utility/transport/tcpip/protlib/cleanuphandler.h \
    339   utility/transport/tcpip/protlib/connectionmap_uds.h
     296  utility/transport/asio/unique_io_service.h \
     297  utility/transport/messages/buffer.hpp \
     298  utility/transport/messages/buffers.hpp \
     299  utility/transport/messages/message.hpp \
     300  utility/transport/messages/shared_buffer.hpp
    340301
    341302#------------> utility :: messages
  • source/ariba/Node.cpp

    r7532 r10653  
    177177}
    178178
     179NodeID Node::sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid,
     180        const LinkProperties& req) {
     181   
     182    return base_overlay->sendMessageCloserToNodeID((Message*) msg, nid, sid);
     183}
     184
     185
    179186seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk) {
    180187        return base_overlay->sendMessage((Message*) msg, lnk);
     
    209216}
    210217
    211 // service directory
    212 
    213 void Node::put( const Data& key, const Data& value, uint16_t ttl, bool replace ) {
    214         base_overlay->dhtPut(key,value,ttl,replace);
    215 }
    216 
    217 void Node::get( const Data& key, const ServiceID& sid ) {
    218         base_overlay->dhtGet(key,sid);
    219 }
    220 
    221218// @see Module.h
    222219string Node::getName() const {
  • source/ariba/Node.h

    r9684 r10653  
    242242
    243243        /**
     244         * like the above function, but sends the message to the closest directly known node
     245         * to the specified address
     246         */
     247    NodeID sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid,
     248            const LinkProperties& req = LinkProperties::DEFAULT);
     249
     250        /**
    244251         * Sends a message via an established link. If reliable transport was
    245252         * selected, the method returns a sequence number and a communication event
     
    281288         */
    282289        bool unbind(CommunicationListener* listener, const ServiceID& sid);
    283 
    284         /**
    285          * Adds a key value pair to the DHT
    286          *
    287          * @param key The key data
    288          * @param value The value data
    289          * @param ttl The time to live in seconds
    290          */
    291         void put( const Data& key, const Data& value, uint16_t ttl, bool replace = false);
    292 
    293         /**
    294          * Queries for values stored in the DHT. Fires an communication event when
    295          * values arrive.
    296          *
    297          * @param key The key data
    298          * @param sid The service that is requesting the values
    299          */
    300         void get( const Data& key, const ServiceID& sid );
    301 
    302290
    303291        //-------------------------------------------------------------------------
  • source/ariba/SideportListener.h

    r9684 r10653  
    4747#include "CommunicationListener.h"
    4848
    49 using std::cout;
    5049using std::map;
    5150using std::vector;
  • source/ariba/communication/BaseCommunication.cpp

    r9322 r10653  
    4141#include "networkinfo/AddressDiscovery.h"
    4242#include "ariba/utility/types/PeerID.h"
     43#include <boost/function.hpp>
    4344
    4445#ifdef UNDERLAY_OMNET
     
    283284SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
    284285
    285 class DispatchMsg {
    286 public:
    287         DispatchMsg() : local(NULL), remote(NULL), message(NULL) {}
    288         address_v* local;
    289         address_v* remote;
    290         Message* message;
    291 };
    292 
    293286/// called when a system event is emitted by system queue
    294287void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
     
    297290        if ( event.getType() == MessageDispatchEvent ){
    298291                logging_debug( "Forwarding message receiver" );
    299                 DispatchMsg* dmsg = event.getData<DispatchMsg>();
    300                 Message* msg = dmsg->message;
    301                 receiveMessage(msg, dmsg->local, dmsg->remote);
    302                 msg->dropPayload();
    303                 delete dmsg->local;
    304                 delete dmsg->remote;
    305                 delete msg;
    306                 delete dmsg;
    307         }
    308 }
    309 
    310 /// called when a message is received from transport_peer
    311 void BaseCommunication::receive_message(transport_protocol* transport,
    312         const address_vf local, const address_vf remote, const uint8_t* data,
    313         size_t size) {
    314 
    315 //      logging_debug( "Dispatching message" );
    316 
    317         // convert data
    318         Data data_( const_cast<uint8_t*>(data), size * 8 );
    319         DispatchMsg* dmsg = new DispatchMsg();
    320 
    321         Message* msg = new Message(data_);
    322         dmsg->local = local->clone();
    323         dmsg->remote = remote->clone();
    324         dmsg->message = msg;
    325 
    326         SystemQueue::instance().scheduleEvent(
    327                 SystemEvent( this, MessageDispatchEvent, dmsg )
    328         );
    329 }
    330 
    331 /// handles a message from the underlay transport
    332 void BaseCommunication::receiveMessage(const Message* message,
    333         const address_v* local, const address_v* remote ){
    334 
     292                boost::function0<void>* handler = event.getData< boost::function0<void> >();
     293                (*handler)();
     294                delete handler;
     295        }
     296}
     297
     298/**
     299 * called within the ASIO thread
     300 * when a message is received from underlay transport
     301 */
     302void BaseCommunication::receive_message(transport_connection::sptr connection,
     303        reboost::message_t msg) {
     304
     305        logging_debug( "Dispatching message" );
     306       
     307    boost::function0<void>* handler = new boost::function0<void>(
     308            boost::bind(
     309                    &BaseCommunication::receiveMessage,
     310                    this,
     311                    connection,
     312                    msg)
     313    );
     314   
     315    SystemQueue::instance().scheduleEvent(
     316        SystemEvent(this, MessageDispatchEvent, handler)
     317    );
     318}
     319
     320/**
     321 * called within the ARIBA thread (System Queue)
     322 * when a message is received from underlay transport
     323 */
     324void BaseCommunication::receiveMessage(transport_connection::sptr connection,
     325        reboost::message_t message)
     326{
     327   
     328    //// Adapt to old message system ////
     329    // Copy data
     330    size_t bytes_len = message.size();
     331    uint8_t* bytes = new uint8_t[bytes_len];
     332    message.read(bytes, 0, bytes_len);
     333   
     334    Data data(bytes, bytes_len * 8);
     335   
     336    Message legacy_message;
     337    legacy_message.setPayload(data);
     338   
     339   
     340   
    335341        /// decapsulate message
    336         AribaBaseMsg* msg = ((Message*)message)->decapsulate<AribaBaseMsg>();
     342        AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>();
    337343        logging_debug( "Receiving message of type " << msg->getTypeString() );
    338344
     
    379385                        LinkID localLink  = LinkID::create();
    380386                        LinkID remoteLink = msg->getLocalLink();
    381                         logging_debug( "local=" << local->to_string()
    382                                 << " remote=" << remote->to_string()
     387                        logging_debug(
     388                                "local=" << connection->getLocalEndpoint()->to_string()
     389                                << " remote=" << connection->getRemoteEndpoint()->to_string()
    383390                        );
    384391
     
    386393                        bool allowlink = true;
    387394                        BOOST_FOREACH( CommunicationEvents* i, eventListener ){
    388                                 allowlink &= i->onLinkRequest( localLink, local, remote );
     395                                allowlink &= i->onLinkRequest( localLink,
     396                                        connection->getLocalEndpoint(),
     397                                        connection->getRemoteEndpoint());
    389398                        }
    390399
     
    400409                        ld->localLink = localLink;
    401410                        ld->remoteLink = remoteLink;
    402                         ld->localLocator = local->clone();
    403                         ld->remoteLocator = remote->clone();
     411                        ld->localLocator = connection->getLocalEndpoint()->clone();
     412                        ld->remoteLocator = connection->getRemoteEndpoint()->clone();
     413                        ld->connection = connection;
    404414                        ld->remoteEndpoint = msg->getLocalDescriptor();
    405415                        add_endpoint(ld->remoteLocator);
     
    409419                                ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
    410420                        localDescriptor.getEndpoints().add(
    411                                 local, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
     421                                connection->getLocalEndpoint(),
     422                                endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
    412423
    413424                        // link is now up-> add it
     
    459470                        }
    460471
     472                        // store the connection
     473                        ld.connection = connection;
     474                       
    461475                        // set remote locator and link id
    462476                        ld.remoteLink = msg->getLocalLink();
    463                         ld.remoteLocator = remote->clone();
     477                        ld.remoteLocator = connection->getRemoteEndpoint()->clone();
    464478                        ld.remoteEndpoint.getEndpoints().add(
    465479                                                        msg->getLocalDescriptor().getEndpoints(),
     
    536550                        // update the remote locator
    537551                        const address_v* oldremote = linkDesc.remoteLocator;
    538                         linkDesc.remoteLocator = remote->clone();
     552                        linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone();
    539553
    540554                        // inform the listeners (local link has _not_ changed!)
     
    747761
    748762/// sends a message to all end-points in the end-point descriptor
    749 void BaseCommunication::send(Message* message, const EndpointDescriptor& endpoint) {
    750         Data data = data_serialize( message, DEFAULT_V );
    751         transport->send( endpoint.getEndpoints(), data.getBuffer(), data.getLength() / 8);
    752         data.release();
     763void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) {
     764        Data data = data_serialize(legacy_message, DEFAULT_V);
     765       
     766        //// Adapt to new message system ////
     767        // transfer data buffer ownership to the shared_buffer
     768    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     769       
     770        reboost::message_t message;
     771        message.push_back(buf);
     772       
     773        transport->send(endpoint.getEndpoints(), message);
    753774}
    754775
    755776/// sends a message to the remote locator inside the link descriptor
    756 void BaseCommunication::send(Message* message, const LinkDescriptor& desc) {
     777void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) {
    757778        if (desc.remoteLocator==NULL) return;
    758         Data data = data_serialize( message, DEFAULT_V );
    759         transport->send( desc.remoteLocator, data.getBuffer(), data.getLength() / 8);
    760         data.release();
     779       
     780        Data data = data_serialize(legacy_message, DEFAULT_V);
     781   
     782    //// Adapt to new message system ////
     783    // transfer data buffer ownership to the shared_buffer
     784    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     785   
     786    reboost::message_t message;
     787    message.push_back(buf);
     788   
     789        desc.connection->send(message);
    761790}
    762791
  • source/ariba/communication/BaseCommunication.h

    r9694 r10653  
    6060#include "ariba/utility/addressing/addressing.hpp"
    6161#include "ariba/utility/transport/transport.hpp"
     62#include "ariba/utility/transport/transport_connection.hpp"
    6263
    6364// communication
     
    190191        virtual void handleSystemEvent(const SystemEvent& event);
    191192
    192         /// called when a message is received form transport_peer
    193         virtual void receive_message(transport_protocol* transport,
    194                 const address_vf local, const address_vf remote, const uint8_t* data,
    195                 size_t size);
     193        /**
     194         * called within the ASIO thread
     195         * when a message is received from underlay transport
     196         */
     197        virtual void receive_message(transport_connection::sptr connection,
     198                reboost::message_t msg);
    196199
    197200protected:
    198201
    199         /// handle received message from a transport module
    200         void receiveMessage(const Message* message,
    201                 const address_v* local, const address_v* remote );
     202        /**
     203         * called within the ARIBA thread (System Queue)
     204         * when a message is received from underlay transport
     205         */
     206        void receiveMessage(transport_connection::sptr connection,
     207                reboost::message_t msg);
    202208
    203209        /// called when a network interface change happens
     
    250256                /// flag, whether this link is up
    251257                bool up;
     258               
     259                /// connection if link is up
     260                transport_connection::sptr connection;
    252261        };
    253262
  • source/ariba/communication/networkinfo/AddressDiscovery.cpp

    r8620 r10653  
    129129                        ip_address ip = straddr;
    130130                        if (ip.is_loopback()) continue;
    131                         if (ip.is_link_local()) continue;
     131//                      if (ip.is_link_local()) continue;
    132132                        address_vf vf = ip;
    133133                        endpoints.add( vf );
  • source/ariba/communication/networkinfo/NetworkInformation.h

    r3690 r10653  
    4040#define __NETWORK_INFORMATION_H
    4141
     42#include <unistd.h>
    4243#include <vector>
    4344#include <string>
  • source/ariba/overlay/BaseOverlay.cpp

    r10576 r10653  
    5151
    5252#include "ariba/overlay/messages/OverlayMsg.h"
    53 #include "ariba/overlay/messages/DHTMessage.h"
    5453#include "ariba/overlay/messages/JoinRequest.h"
    5554#include "ariba/overlay/messages/JoinReply.h"
     
    6665#define visualIdBase            ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
    6766
    68 class ValueEntry {
    69 public:
    70         ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),
    71                 last_change(time(NULL)), value(value.clone()) {
    72         }
    73 
    74         ValueEntry( const ValueEntry& value ) :
    75                 ttl(value.ttl), last_update(value.last_update),
    76                 last_change(value.last_change), value(value.value.clone()) {
    77         }
    78 
    79         ~ValueEntry()  {
    80                 value.release();
    81         }
    82 
    83         void refresh() {
    84                 last_update = time(NULL);
    85         }
    86 
    87         void set_value( const Data& value ) {
    88                 this->value.release();
    89                 this->value = value.clone();
    90                 this->last_change = time(NULL);
    91                 this->last_update = time(NULL);
    92         }
    93 
    94         Data get_value() const {
    95                 return value;
    96         }
    97 
    98         uint16_t get_ttl() const {
    99                 return ttl;
    100         }
    101 
    102         void set_ttl( uint16_t ttl ) {
    103                 this->ttl = ttl;
    104         }
    105 
    106         bool is_ttl_elapsed() const {
    107                 // is persistent? yes-> always return false
    108                 if (ttl==0) return false;
    109                 // return true, if ttl is elapsed
    110                 return ( difftime( time(NULL), this->last_update ) > ttl );
    111         }
    112 
    113 private:
    114         uint16_t ttl;
    115         time_t last_update;
    116         time_t last_change;
    117         Data value;
    118 };
    119 
    120 class DHTEntry {
    121 public:
    122         Data key;
    123         vector<ValueEntry> values;
    124 
    125         vector<Data> get_values() {
    126                 vector<Data> vect;
    127                 BOOST_FOREACH( ValueEntry& e, values )
    128                         vect.push_back( e.get_value() );
    129                 return vect;
    130         }
    131 
    132         void erase_expired_entries() {
    133                 for (vector<ValueEntry>::iterator i = values.begin();
    134                                 i != values.end(); i++ )
    135                         if (i->is_ttl_elapsed())
    136                                 i = values.erase(i)-1;
    137         }
    138 };
    139 
    140 class DHT {
    141 public:
    142         typedef vector<DHTEntry> Entries;
    143         typedef vector<ValueEntry> Values;
    144         Entries entries;
    145         static const bool verbose = false;
    146 
    147         static bool equals( const Data& lhs, const Data& rhs ) {
    148                 if (rhs.getLength()!=lhs.getLength()) return false;
    149                 for (size_t i=0; i<lhs.getLength()/8; i++)
    150                         if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;
    151                 return true;
    152         }
    153 
    154         void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {
    155                 cleanup();
    156 
    157                 // find entry
    158                 for (size_t i=0; i<entries.size(); i++) {
    159                         DHTEntry& entry = entries.at(i);
    160 
    161                         // check if key is already known
    162                         if ( equals(entry.key, key) ) {
    163 
    164                                 // check if value is already in values list
    165                                 for (size_t j=0; j<entry.values.size(); j++) {
    166                                         // found value already? yes-> refresh ttl
    167                                         if ( equals(entry.values[j].get_value(), value) ) {
    168                                                 entry.values[j].refresh();
    169                                                 if (verbose)
    170                                                         std::cout << "DHT: Republished value. Refreshing value timestamp."
    171                                                                 << std::endl;
    172                                                 return;
    173                                         }
    174                                 }
    175 
    176                                 // new value-> add to entry
    177                                 if (verbose)
    178                                         std::cout << "DHT: Added value to "
    179                                                 << " key=" << key << " with value=" << value << std::endl;
    180                                 entry.values.push_back( ValueEntry( value ) );
    181                                 entry.values.back().set_ttl(ttl);
    182                                 return;
    183                         }
    184                 }
    185 
    186                 // key is unknown-> add key value pair
    187                 if (verbose)
    188                         std::cout << "DHT: New key value pair "
    189                                 << " key=" << key << " with value=" << value << std::endl;
    190 
    191                 // add new entry
    192                 entries.push_back( DHTEntry() );
    193                 DHTEntry& entry = entries.back();
    194                 entry.key = key.clone();
    195                 entry.values.push_back( ValueEntry(value) );
    196                 entry.values.back().set_ttl(ttl);
    197         }
    198 
    199         vector<Data> get( const Data& key ) {
    200                 cleanup();
    201                 // find entry
    202                 for (size_t i=0; i<entries.size(); i++) {
    203                         DHTEntry& entry = entries.at(i);
    204                         if ( equals(entry.key,key) )
    205                                 return entry.get_values();
    206                 }
    207                 return vector<Data>();
    208         }
    209 
    210         bool remove( const Data& key ) {
    211                 cleanup();
    212 
    213                 // find entry
    214                 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
    215                         DHTEntry& entry = *i;
    216 
    217                         // found? yes-> delete entry
    218                         if ( equals(entry.key, key) ) {
    219                                 entries.erase(i);
    220                                 return true;
    221                         }
    222                 }
    223                 return false;
    224         }
    225 
    226         bool remove( const Data& key, const Data& value ) {
    227                 cleanup();
    228                 // find entry
    229                 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
    230                         DHTEntry& entry = *i;
    231 
    232                         // found? yes-> try to find value
    233                         if ( equals(entry.key, key) ) {
    234                                 for (Values::iterator j = entry.values.begin();
    235                                                 j != entry.values.end(); j++) {
    236 
    237                                         // value found? yes-> delete
    238                                         if (equals(j->get_value(), value)) {
    239                                                 j = entry.values.erase(j)-1;
    240                                                 return true;
    241                                         }
    242                                 }
    243                         }
    244                 }
    245                 return false;
    246         }
    247 
    248         void cleanup() {
    249                 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
    250                         DHTEntry& entry = *i;
    251                         entry.erase_expired_entries();
    252                         if (entry.values.size()==0)
    253                                 i = entries.erase(i)-1;
    254                 }
    255         }
    256 };
    25767
    25868// ----------------------------------------------------------------------------
     
    758568                        sideport(&SideportListener::DEFAULT), overlayInterface(NULL),
    759569                        counter(0) {
    760         initDHT();
    761570}
    762571
    763572BaseOverlay::~BaseOverlay() {
    764         destroyDHT();
    765573}
    766574
     
    1078886}
    1079887
     888
    1080889seqnum_t BaseOverlay::sendMessage(const Message* message,
    1081890                const NodeID& node, const ServiceID& service) {
     
    1114923}
    1115924
     925
     926NodeID BaseOverlay::sendMessageCloserToNodeID(const Message* message,
     927        const NodeID& address, const ServiceID& service) {
     928   
     929    if ( overlayInterface->isClosestNodeTo(address) )
     930    {
     931        return NodeID::UNSPECIFIED;
     932    }
     933       
     934    const NodeID& closest_node = overlayInterface->getNextNodeId(address);
     935   
     936    if ( closest_node != NodeID::UNSPECIFIED )
     937    {
     938        seqnum_t seqnum = sendMessage(message, closest_node, service);
     939    }
     940   
     941    return closest_node;  // XXX return seqnum ?? tuple? closest_node via (non const) reference?
     942}
    1116943// ----------------------------------------------------------------------------
    1117944
     
    18311658        overlayMsg->addRouteRecord(nodeId);
    18321659
    1833         // handle dht messages (do not route)
    1834         if (overlayMsg->isDHTMessage()) {
    1835                 bool ret = handleDHTMessage(overlayMsg);
    1836                 delete overlayMsg;
    1837                 return ret;
    1838         }
    1839 
    18401660        // handle signaling messages (do not route!)
    18411661        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
     
    18561676                delete overlayMsg;
    18571677                return true;
    1858         }
    1859 
    1860         // handle DHT response messages
    1861         if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {
    1862                 bool ret = handleDHTMessage(overlayMsg);
    1863                 delete overlayMsg;
    1864                 return ret;
    18651678        }
    18661679
     
    19641777        stabilizeRelays();
    19651778        stabilizeLinks();
    1966         stabilizeDHT();
    19671779        updateVisual();
    19681780}
     
    21101922// ----------------------------------------------------------------------------
    21111923
    2112 void BaseOverlay::initDHT() {
    2113         dht = new DHT();
    2114         localDHT = new DHT();
    2115         republishCounter = 0;
    2116 }
    2117 
    2118 void BaseOverlay::destroyDHT() {
    2119         delete dht;
    2120         delete localDHT;
    2121 }
    2122 
    2123 /// stabilize DHT state
    2124 void BaseOverlay::stabilizeDHT() {
    2125 
    2126         // do refresh every 2 seconds
    2127         if (republishCounter < 2) {
    2128                 republishCounter++;
    2129                 return;
    2130         }
    2131         republishCounter = 0;
    2132 
    2133         // remove old values from DHT
    2134         BOOST_FOREACH( DHTEntry& entry, dht->entries ) {
    2135                 // erase old entries
    2136                 entry.erase_expired_entries();
    2137         }
    2138 
    2139         // re-publish values-> do not refresh locally stored values
    2140         BOOST_FOREACH( DHTEntry& entry, localDHT->entries ) {
    2141                 BOOST_FOREACH( ValueEntry& value, entry.values )
    2142                         dhtPut(entry.key, value.get_value(), value.get_ttl(), false, true );
    2143         }
    2144 }
    2145 
    2146 // handle DHT messages
    2147 bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {
    2148 
    2149         // de-capsulate message
    2150         logging_debug("Received DHT message");
    2151         DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();
    2152 
    2153         // handle DHT data message
    2154         if (msg->getType()==OverlayMsg::typeDHTData) {
    2155                 const ServiceID& service = msg->getService();
    2156                 logging_info( "Received DHT data for service " << service.toString() );
    2157 
    2158                 // delegate data message
    2159                 CommunicationListener* lst = getListener(service);
    2160                 if(lst != NULL) lst->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );
    2161                 delete dhtMsg;
    2162                 return true;
    2163         }
    2164 
    2165         // route message to closest node
    2166         if (!overlayInterface->isClosestNodeTo(dhtMsg->getHashedKey())) {
    2167                 logging_debug("Routing DHT message to closest node "
    2168                         << " from " << msg->getSourceNode()
    2169                         << " to " << dhtMsg->getHashedKey()
    2170                 );
    2171                 dhtSend(msg, dhtMsg->getHashedKey());
    2172                 delete dhtMsg;
    2173                 return true;
    2174         }
    2175 
    2176         // now, we are the closest node...
    2177         switch (msg->getType()) {
    2178 
    2179         // ----------------------------------------------------------------- put ---
    2180         case OverlayMsg::typeDHTPut: {
    2181                 logging_debug("DHT-Put: Attempt to store values for key "
    2182                                 << dhtMsg->getKey());
    2183                 if (dhtMsg->doReplace()) {
    2184                         logging_debug("DHT-Put: Attempt to replace key: remove old values first!");
    2185                         dht->remove(dhtMsg->getKey());
    2186                 }
    2187                 BOOST_FOREACH( Data value, dhtMsg->getValues() ) {
    2188                         logging_debug("DHT-Put: Stored value: " << value );
    2189                         dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );
    2190                 }
    2191                 break;
    2192         }
    2193 
    2194         // ----------------------------------------------------------------- get ---
    2195         case OverlayMsg::typeDHTGet: {
    2196                 logging_info("DHT-Get: key=" << dhtMsg->getKey() );
    2197                 vector<Data> vect = dht->get(dhtMsg->getKey());
    2198                 BOOST_FOREACH(const Data& d, vect)
    2199                         logging_info("DHT-Get: value=" << d);
    2200                 OverlayMsg omsg(*msg);
    2201                 omsg.swapRoles();
    2202                 omsg.setType(OverlayMsg::typeDHTData);
    2203                 DHTMessage dhtmsg(dhtMsg->getKey(), vect);
    2204                 omsg.encapsulate(&dhtmsg);
    2205                
    2206                 logging_info("Sending DHT response to " << omsg.getDestinationNode());
    2207                 sendMessage(&omsg, omsg.getDestinationNode());
    2208                 break;
    2209         }
    2210 
    2211         // -------------------------------------------------------------- remove ---
    2212         case OverlayMsg::typeDHTRemove: {
    2213                 if (dhtMsg->hasValues()) {
    2214                         BOOST_FOREACH( Data value, dhtMsg->getValues() )
    2215                                                         dht->remove(dhtMsg->getKey(), value );
    2216                 } else
    2217                         dht->remove( dhtMsg->getKey() );
    2218                 break;
    2219         }
    2220 
    2221         // -------------------------------------------------------------- default---
    2222         default:
    2223                 logging_error("DHT Message type unknown.");
    2224                 return false;
    2225         }
    2226         delete dhtMsg;
    2227         return true;
    2228 }
    2229 
    2230 /// put a value to the DHT with a ttl given in seconds
    2231 void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl, bool replace, bool no_local_refresh ) {
    2232 
    2233         // log
    2234         logging_info("DHT-Put:"
    2235                 << " key=" << key << " value=" << value
    2236                 << " ttl=" << ttl << " replace=" << replace
    2237         );
    2238 
    2239         if (!no_local_refresh) {
    2240 
    2241                 // put into local data store (for refreshes)
    2242                 if (replace) localDHT->remove(key);
    2243                 localDHT->put(key, value, ttl);
    2244         }
    2245 
    2246         DHTMessage dhtmsg( key, value );
    2247         dhtmsg.setReplace( replace );
    2248         dhtmsg.setTTL(ttl);
    2249 
    2250         OverlayMsg msg( OverlayMsg::typeDHTPut );
    2251         msg.encapsulate( &dhtmsg );
    2252         msg.setSourceNode(this->nodeId);
    2253         dhtSend(&msg, dhtmsg.getHashedKey());
    2254 }
    2255 
    2256 /// removes a key value pair from the DHT
    2257 void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {
    2258         // remove from local data store
    2259         localDHT->remove(key,value);
    2260 
    2261         DHTMessage dhtmsg(key,value);
    2262 
    2263         // send message
    2264         OverlayMsg msg(OverlayMsg::typeDHTRemove);
    2265         msg.encapsulate( &dhtmsg );
    2266         msg.setSourceNode(this->nodeId);
    2267         dhtSend(&msg, dhtmsg.getHashedKey());
    2268 }
    2269 
    2270 /// removes all data stored at the given key
    2271 void BaseOverlay::dhtRemove( const Data& key ) {
    2272         // log: remove key
    2273         logging_info("DHT-Remove: Removing key=" << key );
    2274 
    2275         DHTMessage dhtmsg(key);
    2276 
    2277         // send message
    2278         OverlayMsg msg(OverlayMsg::typeDHTRemove);
    2279         msg.encapsulate( &dhtmsg );
    2280         msg.setSourceNode(this->nodeId);
    2281         dhtSend(&msg, dhtmsg.getHashedKey());
    2282 }
    2283 
    2284 /// requests data stored using key
    2285 void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {
    2286         // log: get
    2287         logging_info("DHT-Get: Trying to resolve key=" <<
    2288                         key << " for service=" << service.toString() );
    2289 
    2290         DHTMessage dhtmsg(key);
    2291 
    2292         // send message
    2293         OverlayMsg msg(OverlayMsg::typeDHTGet);
    2294         msg.setService(service);
    2295         msg.encapsulate( &dhtmsg );
    2296         msg.setSourceNode(this->nodeId);
    2297         dhtSend(&msg, dhtmsg.getHashedKey());
    2298 }
    2299 
    2300 void BaseOverlay::dhtSend( OverlayMsg* msg, const NodeID& dest ) {
    2301         // log: dht send
    2302         logging_info("DHT-Send: Sending message with key=" << dest.toString() );
    2303 
    2304         // local storage? yes-> put into DHT directly
    2305         if (overlayInterface->isClosestNodeTo(dest)) {
    2306                 // be compatible with old code so set destination to hashed key
    2307                 msg->setDestinationNode(dest);
    2308                
    2309                 Data d = data_serialize(msg);
    2310                 Message m2(d);
    2311                 OverlayMsg* m3 = m2.decapsulate<OverlayMsg>();
    2312                
    2313                 handleDHTMessage(m3);
    2314                
    2315                 delete m3;
    2316                 return;
    2317         } else {
    2318                 // need to route
    2319                 NodeID next_hop = overlayInterface->getNextNodeId(dest);
    2320                 msg->setDestinationNode(next_hop);
    2321                 logging_info("DHT-Send: sending via node " << next_hop.toString());
    2322                
    2323                 send(msg, next_hop);
    2324                
    2325                 return;
    2326         }
    2327 }
    2328 
    23291924std::string BaseOverlay::debugInformation() {
    23301925        std::stringstream s;
  • source/ariba/overlay/BaseOverlay.h

    r7532 r10653  
    7474using std::vector;
    7575using std::list;
    76 using std::cout;
    7776using std::map;
    7877using std::make_pair;
     
    187186                const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID);
    188187
     188    /**
     189     *  send a message to the closest directly known node to an address
     190     * 
     191     *  @return NodeID of the (closest) destination node;
     192     */
     193        NodeID sendMessageCloserToNodeID(const Message* message, const NodeID& address,
     194                const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID);
     195       
    189196        /**
    190197         * Send out a message to all nodes that are known in the overlay structure.
     
    287294         */
    288295        void leaveSpoVNet();
    289 
    290         /// put a value to the DHT with a ttl given in seconds
    291         void dhtPut( const Data& key, const Data& value, int ttl = 0, bool replace = false, bool no_local_refresh = false);
    292 
    293         /// removes a key value pair from the DHT
    294         void dhtRemove( const Data& key, const Data& value );
    295 
    296         /// removes all data stored at the given key
    297         void dhtRemove( const Data& key );
    298 
    299         /// requests data stored using key
    300         void dhtGet( const Data& key, const ServiceID& service );
    301296
    302297protected:
     
    411406        bool handleJoinRequest( OverlayMsg* msg, const LinkID& bcLink );
    412407        bool handleJoinReply( OverlayMsg* msg, const LinkID& bcLink );
    413 
    414         // handle DHT messages
    415         bool handleDHTMessage( OverlayMsg* msg );
    416408
    417409        // handle link messages
     
    506498                bool ignore_down = false );
    507499
    508         // distributed hashtable handling ------------------------------------------
    509 
    510         DHT* dht;
    511         DHT* localDHT;
    512         int republishCounter;
    513 
    514         void initDHT();
    515         void destroyDHT();
    516         void stabilizeDHT();
    517         void dhtSend( OverlayMsg* msg, const NodeID& dest );
    518 
    519500        // misc --------------------------------------------------------------------
    520501
  • source/ariba/overlay/messages/OverlayMsg.h

    r6919 r10653  
    8989                typeLinkAlive   = 0x35, ///< keep-alive message
    9090
    91                 // DHT routed messages
     91                /// DHT routed messages
     92                /// @deprecated because the DHT has been moved into a separate service
    9293                maskDHT                 = 0x40, ///< bit mask for dht messages
    9394                typeDHTPut      = 0x41, ///< DHT put operation
     
    9697
    9798                /// DHT response messages
     99                /// @deprecated because the DHT has been moved into a separate service
    98100                maskDHTResponse = 0x50, ///< bit mask for dht responses
    99101                typeDHTData     = 0x51, ///< DHT get data
     
    197199        }
    198200
    199         bool isDHTMessage() const {
    200                 return hasTypeMask(maskDHT);
    201         }
    202 
    203201        /// number of hops and time to live ----------------------------------------
    204202
  • source/ariba/utility/addressing

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/addressing/endpoint_set.hpp

    r6919 r10653  
    149149                        std::string sub = str.substr(pos, min(nend2,nend1)-pos);
    150150                        trim(sub);
    151 //                      cout << sub << endl;
    152151                        V obj( sub );
    153152                        set.insert(obj);
  • source/ariba/utility/bootstrap/modules/bluetoothsdp

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/bootstrap/modules/periodicbroadcast

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/bootstrap/modules/periodicbroadcast/PeriodicBroadcast.h

    r7532 r10653  
    5555#include "PeriodicBroadcastMessage.h"
    5656
     57//link-local
     58#include "ariba/utility/transport/tcpip/tcpip.hpp"
     59
    5760using std::map;
    5861using std::string;
    59 using std::cout;
    6062using boost::asio::ip::udp;
    6163
     
    296298                        {
    297299                                udp::endpoint endp(udp::v6(), PeriodicBroadcast::serverport_v6);
    298                                 endp.address( boost::asio::ip::address_v6::from_string("ff02::1") );
    299                                 socket_v6.send_to( boost::asio::buffer(pnt, len), endp, 0, err );
    300                                 if(err) logging_warn("failed sending message through ipv6 socket");
     300                                boost::asio::ip::address_v6 all_nodes = boost::asio::ip::address_v6::from_string("ff02::1");
     301                               
     302                                // include all link-local interfaces
     303                                vector<uint64_t> scope_ids = ariba::transport::tcpip::get_interface_scope_ids();
     304                               
     305                                BOOST_FOREACH ( uint64_t id, scope_ids )
     306                                {
     307                    all_nodes.scope_id(id);
     308                    endp.address( all_nodes );
     309                   
     310                    socket_v6.send_to( boost::asio::buffer(pnt, len), endp, 0, err );
     311                    if(err) logging_warn("failed sending message through ipv6 socket");
     312                                }
    301313                        }
    302314                }
  • source/ariba/utility/misc/Helper.h

    r9770 r10653  
    6767using std::setfill;
    6868using std::setw;
    69 using std::cout;
    7069using std::string;
    7170using std::ostream;
  • source/ariba/utility/serialization/Data.hpp

    r9695 r10653  
    4141
    4242//== library includes ==
     43#include <string.h>
    4344#include <stdlib.h>
    4445#include <iostream>
  • source/ariba/utility/system/Timer.cpp

    r7821 r10653  
    104104
    105105void Timer::eventFunction() {
    106         //std::cout << "unimplemented eventFunction Timer(" << millis << ")" << std::endl;
     106        logging_warn("unimplemented eventFunction Timer(" << millis << ")");
    107107}
    108108
  • source/ariba/utility/transport

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/transport/tcpip/tcpip.cpp

    r10075 r10653  
    11#include "tcpip.hpp"
    22
    3 #define _NO_LOGGING
    4 
    5 // std includes
    6 #include <unistd.h>
    7 #include <iostream>
    8 #include <string>
    9 #include <sstream>
    10 #include <boost/foreach.hpp>
    11 
    12 // protlib includes
    13 #include "protlib/network_message.h"
    14 #include "protlib/tp_over_tcp.h"
    15 #include "protlib/tperror.h"
    16 #include "protlib/logfile.h"
    17 #include "protlib/queuemanager.h"
    18 #include "protlib/threadsafe_db.h"
    19 #include "protlib/setuid.h"
    20 
    21 // protlib namespaces
    22 using namespace protlib;
    23 using namespace protlib::log;
    24 
    25 logfile commonlog;
    26 protlib::log::logfile& protlib::log::DefaultLog(commonlog);
     3#include <boost/array.hpp>
     4
     5// interface discovery for link-local destinations
     6#include <ifaddrs.h>
    277
    288namespace ariba {
    299namespace transport {
    3010
     11use_logging_cpp(tcpip)
     12
    3113using namespace ariba::addressing;
    3214
    33 
    34 tcpip_endpoint convert( const appladdress* addr ) {
    35         const char* ip_str = addr->get_ip_str();
    36         tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() );
    37         return endpoint;
    38 }
    39 
    40 appladdress convert( const tcpip_endpoint& endpoint ) {
    41         tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint);
    42         appladdress
    43                 peer(e->address().to_string().c_str(), "tcp", e->port().asio() );
    44 //      cout << endpoint.to_string() << " to " << peer.get_ip_str() << ":" << peer.get_port() << endl;
    45         return peer;
    46 }
    47 
    48 tcpip::tcpip( uint16_t port ) :
    49         done ( false ),
    50         running ( false ),
    51         port( port ),
    52         tpreceivethread ( NULL ),
    53         tpthread ( NULL ),
    54         listener ( NULL ) {
    55 }
    56 
    57 tcpip::~tcpip() {
    58         if (running) stop();
    59 }
    60 
    61 bool get_message_length( NetMsg& m, uint32& clen_bytes ) {
    62         clen_bytes = m.decode32();
    63         m.set_pos_r(-4);
    64         return true;
    65 }
    66 
    67 void tcpip::start() {
    68         done = false;
    69         running = false;
    70 
    71         // initalize netdb and setuid
    72         protlib::tsdb::init();
    73         protlib::setuid::init();
    74 
    75         // set tcp parameters
    76         port_t port = this->port; // port
    77         TPoverTCPParam tppar(4, get_message_length, port);
    78 
    79         // create receiver thread
    80         FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
    81         QueueManager::instance()->register_queue(tpchecker_fq,
    82                         message::qaddr_signaling);
    83 
    84         // start thread
    85         pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this );
    86         tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
    87         tpthread->start_processing();
    88 }
    89 
    90 void tcpip::stop() {
    91         // stop receiver thread
    92         done = true;
    93 
    94         // stop TPoverTCP
    95         tpthread->stop_processing();
    96         tpthread->abort_processing(true);
    97         tpthread->wait_until_stopped();
    98 
    99         // unregister TPoverTCP
    100         delete QueueManager::instance()->get_queue( message::qaddr_signaling );
    101         QueueManager::instance()->unregister_queue( message::qaddr_signaling );
    102 
    103         // destroy QueueManager
    104         QueueManager::clear();
    105 
    106         // de-initalize netdb and setuid
    107         protlib::setuid::end();
    108         protlib::tsdb::end();
    109 
    110         // wait for thread to finish and delete
    111         pthread_join(tpreceivethread, NULL);
    112 }
    113 
    114 void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) {
    115 
    116         // prepare netmsg with length and and port
    117         NetMsg* datamsg = new NetMsg(size + 6);
    118         datamsg->encode32( size + 2,  true );
    119         datamsg->encode16( this->port,true );
    120 
    121         for (size_t i=0; i<size; i++)
    122                 datamsg->encode8( data[i],true );
    123 
    124         // send message
    125         tcpip_endpoint endpoint = *remote;
    126         appladdress peer = convert(endpoint);
    127 
    128         // add to output queue
    129         tpthread->get_thread_object()->send( datamsg, peer, false );
    130 }
    131 
    132 void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
    133         // send a message to each combination of ip-address and port
    134         BOOST_FOREACH( const ip_address ip, endpoints.ip ) {
    135                 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
    136                         tcpip_endpoint endpoint(ip,port);
    137                         address_vf vf = endpoint;
    138                         send(vf,data,size);
    139                 }
    140         }
    141 }
    142 
    143 void tcpip::terminate( const address_v* remote) {
    144         tcpip_endpoint endpoint = *remote;
    145         appladdress peer = convert(endpoint);
    146         peer.convert_to_ipv6();
    147         tpthread->get_thread_object()->terminate( peer );
    148 }
    149 
    150 void tcpip::register_listener( transport_listener* listener ) {
    151         this->listener = listener;
    152 }
    153 
    154 void* tcpip::receiverThread( void* ptp ) {
    155         // get reference to transport object
    156         tcpip& tp = *((tcpip*)ptp);
    157 
    158         // get queue
    159         FastQueue* fq =
    160                 QueueManager::instance()->get_queue(message::qaddr_signaling);
    161 
    162         // main processing loop
    163         tp.running = true;
    164         while (!tp.done) {
    165 
    166                 // wait for new message to approach
    167                 message* msg = fq->dequeue_timedwait(300);
    168 
    169                 // message has arrived? no-> continue
    170                 if (!msg) continue;
    171 
    172                 // handle transport message
    173                 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
    174                 if (!tpmsg) {
    175                         delete msg;
    176                         continue;
    177                 }
    178 
    179                 // get address & message
    180                 const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
    181                 const appladdress* local_peer  = static_cast<const appladdress*>( tpmsg->get_ownaddress() );
    182                 NetMsg* datamsg = tpmsg->get_message();
    183 
    184                 // not a data message? -> continue!
    185                 if (!datamsg) {
    186                         delete tpmsg;
    187                         continue;
    188                 }
    189 
    190                 // get length and remote endpoint port
    191                 datamsg->set_pos(0);
    192                 uint32_t message_size = datamsg->decode32(true)-2;
    193                 //uint16_t remote_port = datamsg->decode16(true);
    194 
    195                 // inform listener
    196                 if (tp.listener != NULL) {
    197                         tcpip_endpoint remote = convert(remote_peer);
    198                         tcpip_endpoint local  = convert(local_peer);
    199                         tp.listener->receive_message(
    200                                         &tp, local, remote, datamsg->get_buffer()+6, message_size );
    201                 }
    202 
    203                 tpmsg->set_message(NULL);
    204                 delete datamsg;
    205                 delete tpmsg;
    206         }
    207         // clean queue & stop
    208         fq->cleanup();
    209         tp.running = false;
    210         return NULL;
     15typedef boost::mutex::scoped_lock unique_lock;
     16
     17tcpip::tcpip( const tcp::endpoint& endp )  :
     18        listener(NULL),
     19        acceptor(u_io_service.get_asio_io_service(), endp)
     20{
     21}
     22
     23tcpip::~tcpip(){}
     24
     25void tcpip::start()
     26{
     27    // open server socket
     28    accept();
     29   
     30    u_io_service.start();
     31}
     32
     33
     34void tcpip::stop()
     35{
     36    acceptor.close();
     37   
     38    u_io_service.stop();
     39}
     40
     41
     42/* see header file for comments */
     43void tcpip::send(
     44        const tcp::endpoint& dest_addr,
     45        reboost::message_t message,
     46        uint8_t priority)
     47{
     48    ConnPtr conn;
     49    bool need_to_connect = false;
     50   
     51    {
     52        unique_lock lock(connections_lock);
     53       
     54        ConnectionMap::iterator it = connections.find(dest_addr);
     55        if (it == connections.end())
     56        {
     57            ConnPtr tmp_ptr(
     58                    new tcpip_connection(
     59                            u_io_service.get_asio_io_service(),
     60                            shared_from_this() )
     61                    );
     62            conn = tmp_ptr;
     63           
     64            conn->partner = dest_addr;
     65            conn->remote = convert_address(dest_addr);
     66           
     67            // Note: starting the send is the obligation of the connect_handler
     68            // (avoids trying to send while not connected yet)
     69            conn->sending =  true;
     70            need_to_connect = true;
     71           
     72            ConnectionMap::value_type item(dest_addr, conn);
     73            connections.insert(item);
     74           
     75        } else {
     76            conn = it->second;
     77        }
     78    }
     79   
     80   
     81    // * the actual send *
     82    conn->enqueue_for_sending(message, priority);
     83   
     84    // if new connection connect to the other party
     85    if ( need_to_connect )
     86    {
     87        conn->sock.async_connect(
     88                dest_addr,
     89                boost::bind(
     90                        &tcpip_connection::async_connect_handler,
     91                        conn,
     92                        boost::asio::placeholders::error));
     93    }
     94}
     95
     96
     97/* see header file for comments */
     98void tcpip::send(
     99        const address_v* remote,
     100        reboost::message_t message,
     101        uint8_t priority)
     102{
     103    send(convert_address(remote), message, priority);
     104}
     105
     106
     107/* see header file for comments */
     108void tcpip::send(
     109        const endpoint_set& endpoints,
     110        reboost::message_t message,
     111        uint8_t priority )
     112{
     113    // network interfaces scope_ids, for link-local connections (lazy initialization)
     114    vector<uint64_t> scope_ids;
     115   
     116    // send a message to each combination of address-address and port
     117    BOOST_FOREACH( const ip_address address, endpoints.ip ) {
     118        BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
     119            tcp::endpoint endp(address.asio(), port.asio());
     120           
     121            // special treatment for link local addresses
     122            //   ---> send over all (suitable) interfaces
     123            if ( endp.address().is_v6() )
     124            {
     125                boost::asio::ip::address_v6 v6_addr = endp.address().to_v6();
     126               
     127                if ( v6_addr.is_link_local() )
     128                {
     129                    // initialize scope_ids
     130                    if ( scope_ids.size() == 0 )
     131                        scope_ids = get_interface_scope_ids();
     132                   
     133                    BOOST_FOREACH ( uint64_t id, scope_ids )
     134                    {                       
     135                        v6_addr.scope_id(id);
     136                        endp.address(v6_addr);
     137   
     138                        logging_debug("------> SEND TO (link-local): " << endp);
     139                        // * send *
     140                        send(endp, message, priority);
     141                    }
     142                }
     143               
     144                continue;
     145            }
     146           
     147            // * send *
     148            send(endp, message, priority);
     149        }
     150    }
     151}
     152
     153
     154void tcpip::register_listener( transport_listener* listener )
     155{
     156    this->listener = listener;
     157}
     158
     159
     160void tcpip::terminate( const address_v* remote )
     161{
     162    terminate(convert_address(remote));
     163}
     164
     165void tcpip::terminate( const tcp::endpoint& remote )
     166{
     167    ConnPtr conn;
     168   
     169    // find and forget connection
     170    {
     171        unique_lock lock(connections_lock);
     172       
     173        ConnectionMap::iterator it = connections.find(remote);
     174        if (it == connections.end())
     175        {
     176            return;
     177        }
     178       
     179        conn = it->second;
     180       
     181        connections.erase(it);
     182    }
     183
     184    // close connection
     185    boost::system::error_code ec;
     186    conn->sock.shutdown(tcp::socket::shutdown_both, ec);
     187    conn->sock.close(ec);
     188}
     189
     190
     191/* private */
     192void tcpip::accept()
     193{
     194    // create new connection object
     195    ConnPtr conn(
     196            new tcpip_connection(
     197                    u_io_service.get_asio_io_service(),
     198                    shared_from_this()
     199            )
     200    );
     201   
     202    // wait for incoming connection
     203    acceptor.async_accept(
     204            conn->sock,
     205            boost::bind(&self::async_accept_handler,
     206                    this->shared_from_this(),
     207                    conn,
     208                    boost::asio::placeholders::error)
     209    );
     210}
     211
     212void tcpip::async_accept_handler(ConnPtr conn, const error_code& error)
     213{
     214    if ( ! error )
     215    {
     216        conn->partner = conn->sock.remote_endpoint();
     217        conn->remote = convert_address(conn->partner);
     218        conn->local = convert_address(conn->sock.local_endpoint());
     219       
     220        {
     221            unique_lock lock(connections_lock);
     222           
     223            ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
     224            connections.insert(item);
     225        }
     226       
     227        // read
     228        conn->listen();
     229    }
     230   
     231    // accept further connections
     232    accept();
     233}
     234
     235inline tcp::endpoint tcpip::convert_address( const address_v* address )
     236{
     237    tcpip_endpoint endpoint = *address;
     238   
     239    return tcp::endpoint(
     240        endpoint.address().asio(), endpoint.port().value()
     241    );
     242}
     243
     244
     245inline tcpip_endpoint tcpip::convert_address(const tcp::endpoint& endpoint)
     246{
     247    ip_address address;
     248    address.asio(endpoint.address());
     249    tcp_port_address port;
     250    port.value(endpoint.port());
     251    return tcpip_endpoint(address, port);
     252}
     253
     254
     255vector<uint64_t> tcpip::get_interface_scope_ids()
     256{
     257    vector<uint64_t> ret;
     258   
     259    struct ifaddrs* ifaceBuffer = NULL;
     260    void*           tmpAddrPtr  = NULL;
     261   
     262    int ok = getifaddrs( &ifaceBuffer );
     263    if( ok != 0 ) return ret;
     264
     265    for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
     266
     267        // ignore devices that are disabled or have no ip
     268        if(i == NULL) continue;
     269        struct sockaddr* addr = i->ifa_addr;
     270        if (addr==NULL) continue;
     271
     272        // only use ethX and wlanX devices
     273        string device = string(i->ifa_name);
     274        if ( (device.find("eth") == string::npos) &&
     275              (device.find("wlan")  == string::npos) /* &&
     276              (device.find("lo")  == string::npos) XXX */ )
     277        {
     278            continue;
     279        }
     280
     281        // only use interfaces with ipv6 link-local addresses
     282        if (addr->sa_family == AF_INET6)
     283        {
     284            // convert address
     285            // TODO should be possible without detour over strings
     286            char straddr[INET6_ADDRSTRLEN];
     287            tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
     288            inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
     289
     290            address_v6 v6addr = address_v6::from_string(straddr);
     291            if ( v6addr.is_link_local() )
     292            {
     293                // * append the scope_id to the return vector *
     294                ret.push_back(if_nametoindex(i->ifa_name));
     295            }
     296
     297        }
     298    }
     299
     300    freeifaddrs(ifaceBuffer);
     301   
     302    return ret;
     303}
     304
     305
     306/*****************
     307 ** inner class **
     308 *****************/
     309
     310tcpip::tcpip_connection::tcpip_connection(boost::asio::io_service & io_service, TcpIpPtr parent)  :
     311        sock(io_service),
     312        valid(true),
     313        parent(parent),
     314        out_queues(8), //TODO How much priorities shall we have?
     315        sending(false)
     316{
     317        header.length = 0;
     318        header.prot = 0;
     319}
     320
     321/*-------------------------------------------
     322 | implement transport_connection interface |
     323 -------------------------------------------*/
     324void tcpip::tcpip_connection::send(
     325        reboost::message_t message,
     326        uint8_t priority)
     327{
     328    enqueue_for_sending(message, priority);
     329}
     330
     331
     332address_vf tcpip::tcpip_connection::getLocalEndpoint()
     333{
     334    return local;
     335}
     336
     337
     338address_vf tcpip::tcpip_connection::getRemoteEndpoint()
     339{
     340    return remote;
     341}
     342
     343
     344void tcpip::tcpip_connection::terminate()
     345{
     346    parent->terminate(partner);
     347}
     348
     349
     350/*------------------------------
     351 | things we defined ourselves |
     352 ------------------------------*/
     353void tcpip::tcpip_connection::async_connect_handler(const error_code& error)
     354{
     355    if (error)
     356    {
     357        parent->terminate(partner);
     358
     359        return;
     360    }
     361   
     362    // save address in ariba format
     363    local = parent->convert_address(sock.local_endpoint());
     364   
     365    // Note: sending has to be true at this point
     366    send_next_package();
     367   
     368    listen();
     369}
     370
     371
     372void tcpip::tcpip_connection::listen()
     373{
     374    boost::asio::async_read(
     375            this->sock,
     376            boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
     377            boost::bind(
     378                    &tcpip::tcpip_connection::async_read_header_handler,
     379                    this->shared_from_this(),
     380                    boost::asio::placeholders::error,
     381                    boost::asio::placeholders::bytes_transferred
     382            )
     383    );
     384}
     385
     386
     387void tcpip::tcpip_connection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
     388{
     389    if (error)
     390    {
     391        parent->terminate(partner);
     392
     393        return;
     394    }
     395
     396    // convert byte order
     397    header.length = ntohl(header.length);
     398    header.length -= 2;  // XXX protlib
     399   
     400    assert(header.length > 0);
     401   
     402    // new buffer for the new packet
     403    buffy = shared_buffer_t(header.length);
     404
     405    // * read data *
     406    boost::asio::async_read(
     407            this->sock,
     408            boost::asio::buffer(buffy.mutable_data(), buffy.size()),
     409            boost::bind(
     410                    &tcpip::tcpip_connection::async_read_data_handler,
     411                    this->shared_from_this(),
     412                    boost::asio::placeholders::error,
     413                    boost::asio::placeholders::bytes_transferred
     414            )
     415    );
     416}
     417
     418void tcpip::tcpip_connection::async_read_data_handler(
     419        const error_code& error, size_t bytes_transferred)
     420{
     421    if (error)
     422    {
     423        parent->terminate(partner);
     424
     425        return;
     426    }
     427   
     428    message_t msg;
     429    msg.push_back(buffy);
     430    buffy = shared_buffer_t();
     431
     432    if ( parent->listener )
     433        parent->listener->receive_message(shared_from_this(), msg);
     434   
     435    listen();
     436}
     437
     438/* see header file for comments */
     439void tcpip::tcpip_connection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
     440{
     441    if ( error )
     442    {       
     443        // remove this connection
     444        parent->terminate(partner);
     445
     446        return;
     447    }
     448   
     449    send_next_package();
     450}
     451
     452
     453
     454void tcpip::tcpip_connection::enqueue_for_sending(Packet packet, uint8_t priority)
     455{
     456    bool restart_sending = false;
     457   
     458    // enqueue packet  [locked]
     459    {
     460        unique_lock(out_queues_lock);
     461       
     462        assert( priority < out_queues.size() );
     463        out_queues[priority].push(packet);
     464       
     465        if ( ! sending )
     466        {
     467            restart_sending = true;
     468            sending = true;
     469        }
     470    }
     471   
     472    // if sending was stopped, we have to restart it here
     473    if ( restart_sending )
     474    {
     475        send_next_package();
     476    }
     477}
     478
     479/* see header file for comments */
     480void tcpip::tcpip_connection::send_next_package()
     481{
     482    Packet packet;
     483    bool found = false;
     484
     485    // find packet with highest priority  [locked]
     486    {
     487        unique_lock(out_queues_lock);
     488       
     489        for ( vector<OutQueue>::iterator it = out_queues.begin();
     490                it != out_queues.end(); it++ )
     491        {
     492            if ( !it->empty() )
     493            {
     494                packet = it->front();
     495                it->pop();
     496                found = true;
     497               
     498                break;
     499            }
     500        }
     501       
     502        // no packets waiting --> stop sending
     503        if ( ! found )
     504        {
     505            sending = false;
     506        }
     507    }
     508   
     509    // * send *
     510    if ( found )
     511    {
     512        reboost::shared_buffer_t header_buf(sizeof(header_t));
     513        header_t* header = (header_t*)(header_buf.mutable_data());
     514        header->length = htonl(packet.size()+2);  // XXX protlib
     515       
     516        packet.push_front(header_buf);
     517       
     518        // "convert" message to asio buffer sequence
     519        vector<boost::asio::const_buffer> send_sequence(packet.length());
     520        for ( int i=0; i < packet.length(); i++ )
     521        {
     522            shared_buffer_t b = packet.at(i);
     523            send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
     524        }
     525       
     526        // * async write *
     527        boost::asio::async_write(
     528                this->sock,
     529                send_sequence,
     530                boost::bind(
     531                        &tcpip::tcpip_connection::async_write_handler,
     532                        this->shared_from_this(),
     533                        packet,  // makes sure our shared pointer lives long enough ;-)
     534                        boost::asio::placeholders::error,
     535                        boost::asio::placeholders::bytes_transferred)
     536        );
     537    }
    211538}
    212539
  • source/ariba/utility/transport/tcpip/tcpip.hpp

    r5993 r10653  
    33
    44#include "ariba/utility/transport/transport.hpp"
    5 #include <pthread.h>
    6 
    7 // forward declaration
    8 namespace protlib {
    9 template<class X, class Y>
    10 class ThreadStarter;
    11 class TPoverTCP;
    12 class TPoverTCPParam;
    13 }
     5#include "ariba/utility/transport/asio/unique_io_service.h"
     6#include "ariba/utility/transport/transport_connection.hpp"
     7#include "ariba/utility/addressing/tcpip_endpoint.hpp"
     8#include <boost/asio.hpp>
     9#include <boost/shared_ptr.hpp>
     10#include <boost/enable_shared_from_this.hpp>
     11#include <queue>
     12#include "ariba/utility/transport/messages/buffers.hpp"
     13#include "ariba/utility/logging/Logging.h"
    1414
    1515namespace ariba {
    1616namespace transport {
    1717
    18 using namespace protlib;
     18using namespace std;
     19using ariba::transport::detail::unique_io_service;
     20using ariba::addressing::tcpip_endpoint;
     21using boost::asio::ip::tcp;
     22using boost::asio::ip::address_v6;
     23using boost::system::error_code;
     24using reboost::shared_buffer_t;
     25using reboost::message_t;
    1926
    20 /**
    21  * TODO: Doc
    22  *
    23  * @author Sebastian Mies <mies@tm.uka.de>
    24  */
    25 class tcpip : public transport_protocol {
     27class tcpip;
     28typedef boost::shared_ptr<tcpip> TcpIpPtr;
     29
     30class tcpip :
     31    public transport_protocol,
     32    public boost::enable_shared_from_this<tcpip>
     33{
     34    typedef tcpip self;
     35use_logging_h(tcpip)
     36
     37private:
     38    class tcpip_connection :
     39        public transport_connection,
     40        public boost::enable_shared_from_this<tcpip_connection>
     41    {
     42    public:
     43        typedef reboost::message_t Packet;
     44        typedef std::queue<Packet> OutQueue;
     45       
     46        struct header_t
     47        {
     48            uint32_t length;
     49            uint16_t prot;  // XXX protlib
     50        } __attribute__((packed));
     51           
     52        tcpip_connection(boost::asio::io_service& io_service, TcpIpPtr parent);
     53       
     54        /// Inherited from transport_connection
     55        virtual void send(reboost::message_t message, uint8_t priority = 0);
     56        virtual address_vf getLocalEndpoint();
     57        virtual address_vf getRemoteEndpoint();
     58        virtual void terminate();
     59       
     60        void listen();
     61       
     62        void async_connect_handler(const error_code& error);
     63       
     64        void async_read_header_handler(const error_code& error, size_t bytes_transferred);
     65        void async_read_data_handler(const error_code& error, size_t bytes_transferred);
     66       
     67        /*
     68         * is called from asio when write operation "returns",
     69         * calls private function `send_next_package()`
     70         */
     71        void async_write_handler(
     72                reboost::shared_buffer_t packet,
     73                const error_code& error,
     74                size_t bytes_transferred);
     75
     76       
     77        void enqueue_for_sending(Packet packet, uint8_t priority);
     78       
     79    private:
     80        /*
     81         * is called from `send` or `async_write_handler` to begin/keep sending
     82         * sends the next message with the highest priority in this connection
     83         */
     84        void send_next_package();
     85
     86
     87    public:
     88        tcp::socket sock;
     89        bool valid;
     90        TcpIpPtr parent;
     91       
     92        tcp::endpoint partner;
     93        tcpip_endpoint remote;
     94        tcpip_endpoint local;
     95       
     96        vector<OutQueue> out_queues;     // to be locked with out_queues_lock
     97        boost::mutex out_queues_lock;
     98       
     99        bool sending;       // to be locked with out_queues_lock
     100       
     101        header_t header;
     102        shared_buffer_t buffy;
     103    };
     104    typedef boost::shared_ptr<tcpip_connection> ConnPtr;
     105    typedef std::map<tcp::endpoint, ConnPtr> ConnectionMap;
     106   
    26107public:
    27         tcpip( uint16_t port );
     108        tcpip( const tcp::endpoint& endp );
    28109        virtual ~tcpip();
    29110        virtual void start();
    30111        virtual void stop();
    31         virtual void send( const address_v* remote, const uint8_t* data, size_t size );
    32         virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size );
     112       
     113        /**
     114     * enqueues message for sending
     115     * create new connection if necessary
     116     * starts sending mechanism (if not already running)
     117     */
     118    void send(
     119            const tcp::endpoint&,
     120            reboost::message_t message,
     121            uint8_t priority = 0 );
     122       
     123        /**
     124         * Converts address_v to tcp::endpoint and calls the real send() function
     125         */
     126        virtual void send(
     127                const address_v* remote,
     128                reboost::message_t message,
     129                uint8_t priority = 0 );
     130       
     131        /**
     132         * calls send for each destination endpoint in `endpoint_set& endpoints`
     133         */
     134        virtual void send(
     135                const endpoint_set& endpoints,
     136                reboost::message_t message,
     137                uint8_t priority = 0 );
     138       
    33139        virtual void terminate( const address_v* remote );
     140        virtual void terminate( const tcp::endpoint& remote );
    34141        virtual void register_listener( transport_listener* listener );
    35142
     143       
     144    /**
     145     *  returns a vector of (interesting) network interfaces
     146     * 
     147     *  [NOTE: The current implementation returns the scope_ids of
     148     *  all ethX and wlanX network interfaces, to be used for
     149     *  connections to link-local ipv6 addresses.]
     150     * 
     151     *  TODO move to ariba/communication/networkinfo/AddressDiscovery ??
     152     * 
     153     */
     154    static vector<uint64_t> get_interface_scope_ids();
     155
    36156private:
    37         volatile bool done, running;
    38         uint16_t port;
    39         pthread_t tpreceivethread;
    40         ThreadStarter<TPoverTCP, TPoverTCPParam>* tpthread;
    41         static void* receiverThread( void* ptp );
     157        void accept();
     158        void async_accept_handler(ConnPtr conn, const error_code& error);
     159        tcp::endpoint convert_address(const address_v* endpoint);
     160        tcpip_endpoint convert_address(const tcp::endpoint& endpoint);
     161       
     162private:
    42163        transport_listener* listener;
     164        unique_io_service u_io_service;
     165        tcp::acceptor acceptor;
     166       
     167        ConnectionMap connections;
     168        boost::mutex connections_lock;
    43169};
    44170
  • source/ariba/utility/transport/transport.hpp

    r5284 r10653  
    88// transport protocol implementations
    99#include "tcpip/tcpip.hpp"
    10 #include "rfcomm/rfcomm.hpp"
     10#include "rfcomm/rfcomm_transport.hpp"
    1111
    1212// common transport peer using all known protocols
  • source/ariba/utility/transport/transport_listener.hpp

    r5993 r10653  
    55
    66#include "ariba/utility/addressing/addressing.hpp"
     7#include "ariba/utility/transport/messages/buffers.hpp"
     8#include "ariba/utility/transport/transport_connection.hpp"
    79
    810// namespace ariba::transport
     
    1113
    1214using namespace ariba::addressing;
    13 
    14 class transport_protocol;
    1515
    1616/**
     
    2121class transport_listener {
    2222public:
     23    /// Allow deleting implementing classes by pointer
     24    virtual ~transport_listener() {}
     25   
    2326        /// called when a message is received
    2427        virtual void receive_message(
    25                 transport_protocol* transport,
    26                 const address_vf local, const address_vf remote,
    27                 const uint8_t* data, size_t size
     28        transport_connection::sptr connection,
     29                reboost::message_t msg
    2830        ) {
    2931                std::cout << "transport_listener: not implemented" << std::endl;
  • source/ariba/utility/transport/transport_peer.cpp

    r7834 r10653  
    33#include "transport_peer.hpp"
    44#include "transport.hpp"
     5#include "ariba/utility/logging/Logging.h"
     6#include <boost/asio/ip/tcp.hpp>
     7#include <boost/asio/error.hpp>
     8#include <boost/foreach.hpp>
     9
     10#ifdef ECLIPSE_PARSER
     11    #define foreach(a, b) for(a : b)
     12#else
     13    #define foreach(a, b) BOOST_FOREACH(a, b)
     14#endif
    515
    616// namespace ariba::transport
     
    919
    1020using namespace ariba::addressing;
     21using boost::asio::ip::tcp;
     22
     23#ifdef HAVE_LIBBLUETOOTH
     24using boost::asio::bluetooth::rfcomm;
     25#endif
     26
     27use_logging_cpp(transport_peer);
    1128
    1229transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) {
    13         // setup tcp transports
    14         tcp = NULL;
    15         //cout << "#tcpip_transports = " << local.tcp.size() << endl;
    16         if (local.tcp.size()==1) {
    17                 tcp = new tcpip(local.tcp.begin()->value());
    18                 //cout << "Started tcpip_transport on port "  << local.tcp.begin()->value() << endl;
    19         }
    20 
     30   
     31    // setup tcp transports
     32    foreach(tcp_port_address port, local.tcp) {
     33       
     34        if (local.ip.size() > 0) {
     35            foreach(ip_address ip_addr, local.ip) {
     36               
     37                tcp::endpoint endp(ip_addr.asio(), port.asio());
     38                create_service(endp);
     39            }
     40        } else {
     41            tcp::endpoint endp_v6(tcp::v6(), port.asio());
     42            tcp::endpoint endp_v4(tcp::v4(), port.asio());
     43           
     44            create_service(endp_v6);
     45            create_service(endp_v4);
     46        }
     47       
     48    }
     49   
    2150        #ifdef HAVE_LIBBLUETOOTH
    22         // setup rfcomm transports
    23         rfc = NULL;
    24         //cout << "#rfcomm_transports = " << local.rfcomm.size() << endl;
    25         if ( local.rfcomm.size() == 1 ) {
    26                 rfc = new rfcomm( local.rfcomm.begin()->value() );
    27                 //cout << "Started rfcomm_transport on port "  << local.rfcomm.begin()->value() << endl;
    28         }
     51    foreach(rfcomm_channel_address channel, local.rfcomm) {
     52        if (local.bluetooth.size() > 0) {
     53                foreach(mac_address mac, local.bluetooth) {
     54                        rfcomm::endpoint endp(mac.bluetooth(), channel.value());
     55                        create_service(endp);
     56                }
     57        } else {
     58                rfcomm::endpoint endp(channel.value());
     59                create_service(endp);
     60        }
     61    }
    2962        #endif
    3063}
    3164
     65void transport_peer::create_service(tcp::endpoint endp) {
     66    try {
     67        TcpIpPtr tmp_ptr(new tcpip(endp));
     68        tcps.push_back(tmp_ptr);
     69        logging_info("Listening on IP/TCP " << endp);
     70       
     71    } catch (boost::system::system_error& e) {
     72        if (e.code() == boost::asio::error::address_in_use) {
     73            logging_warn("[WARN] Address already in use: "
     74                    << endp << ". Endpoint will be ignored!");
     75        } else {
     76            // Rethrow
     77            throw;
     78        }
     79    }
     80}
     81
     82#ifdef HAVE_LIBBLUETOOTH
     83void transport_peer::create_service(rfcomm::endpoint endp) {
     84    try {
     85        rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp));
     86        rfcomms.push_back(tmp_ptr);
     87        logging_info("Listening on bluetooth/RFCOMM " << endp);
     88       
     89    } catch (boost::system::system_error& e) {
     90        if (e.code() == boost::asio::error::address_in_use) {
     91            logging_warn("[WARN] Address already in use: "
     92                    << endp << ". Endpoint will be ignored!");
     93        } else {
     94            // Rethrow
     95            throw;
     96        }
     97    }
     98}
     99#endif
     100
    32101transport_peer::~transport_peer() {
    33         if (tcp !=NULL ) delete tcp;
    34 #ifdef HAVE_LIBBLUETOOTH
    35         if (rfc !=NULL ) delete rfc;
    36 #endif
    37102}
    38103
    39104void transport_peer::start() {
    40         if (tcp!=NULL) tcp->start();
     105    foreach(TcpIpPtr tcp, tcps) {
     106        tcp->start();
     107    }
     108   
    41109#ifdef HAVE_LIBBLUETOOTH
    42         if (rfc!=NULL) rfc->start();
     110    foreach(rfcomm_transport::sptr x, rfcomms) {
     111        x->start();
     112    }
    43113#endif
    44114}
    45115
    46116void transport_peer::stop() {
    47         if (tcp!=NULL) tcp->stop();
     117    foreach(TcpIpPtr tcp, tcps) {
     118        tcp->stop();
     119    }
     120   
    48121#ifdef HAVE_LIBBLUETOOTH
    49         if (rfc!=NULL) rfc->stop();
     122        foreach(rfcomm_transport::sptr x, rfcomms) {
     123                x->stop();
     124        }
    50125#endif
    51126}
    52127
    53 void transport_peer::send( const address_v* remote, const uint8_t* data, size_t size ) {
    54         if (remote->instanceof<tcpip_endpoint>() && tcp!=NULL) {
    55                 tcp->send(remote,data,size);
    56         } else
     128
     129void transport_peer::send(
     130        const endpoint_set& endpoints,
     131        reboost::message_t message,
     132        uint8_t priority)
     133{
     134    foreach(TcpIpPtr tcp, tcps) {
     135        tcp->send(endpoints, message, priority);
     136    }
     137   
    57138#ifdef HAVE_LIBBLUETOOTH
    58         if (remote->instanceof<rfcomm_endpoint>() && rfc!=NULL) {
    59                 rfc->send(remote,data,size);
    60         } else
    61 #endif
    62                 cerr << "Could not send message to " << remote->to_string() << endl;
    63 }
    64 
    65 void transport_peer::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
    66         if (tcp!=NULL) tcp->send(endpoints,data,size);
    67 #ifdef HAVE_LIBBLUETOOTH
    68         if (rfc!=NULL) rfc->send(endpoints,data,size);
     139    foreach(rfcomm_transport::sptr x, rfcomms) {
     140                x->send(endpoints, message, priority);
     141        }
    69142#endif
    70143}
    71144
    72145void transport_peer::terminate( const address_v* remote ) {
    73         if (remote->instanceof<tcpip_endpoint>() && tcp!=NULL)
    74                 tcp->terminate(remote);
     146        if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
     147        {
     148            foreach(TcpIpPtr tcp, tcps) {
     149                tcp->terminate(remote);
     150            }
     151        }
    75152#ifdef HAVE_LIBBLUETOOTH
    76         if (remote->instanceof<rfcomm_endpoint>() && rfc!=NULL)
    77                 rfc->terminate(remote);
     153        if (remote->instanceof<rfcomm_endpoint>()) {
     154                foreach(rfcomm_transport::sptr x, rfcomms) {
     155                        x->terminate(remote);
     156                }
     157        }
    78158#endif
    79159}
    80160
    81161void transport_peer::register_listener( transport_listener* listener ) {
    82         if (tcp!=NULL) tcp->register_listener(listener);
     162    foreach(TcpIpPtr tcp, tcps) {
     163        tcp->register_listener(listener);
     164    }
     165   
    83166#ifdef HAVE_LIBBLUETOOTH
    84         if (rfc!=NULL) rfc->register_listener(listener);
     167    foreach(rfcomm_transport::sptr x, rfcomms) {
     168        x->register_listener(listener);
     169    }
    85170#endif
    86171}
  • source/ariba/utility/transport/transport_peer.hpp

    r9324 r10653  
    55#include "transport_protocol.hpp"
    66#include "ariba/utility/addressing/endpoint_set.hpp"
     7#include <boost/shared_ptr.hpp>
     8#include "rfcomm/bluetooth_rfcomm.hpp"
     9
    710
    811// namespace ariba::transport
     
    1316
    1417class tcpip;
     18
    1519#ifdef HAVE_LIBBLUETOOTH
    16 class rfcomm;
     20class rfcomm_transport;
    1721#endif
    1822
     
    3034        virtual void start();
    3135        virtual void stop();
    32         virtual void send( const address_v* remote, const uint8_t* data, size_t size );
    33         virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size );
     36       
     37        virtual void send(
     38                const endpoint_set& endpoints,
     39                reboost::message_t message,
     40                uint8_t priority = 0);
     41       
     42        /// @deprecated: Use terminate() from transport_connection instead
    3443        virtual void terminate( const address_v* remote );
     44       
    3545        virtual void register_listener( transport_listener* listener );
    3646
    3747private:
     48        void create_service(tcp::endpoint endp);
     49#ifdef HAVE_LIBBLUETOOTH
     50        void create_service(boost::asio::bluetooth::rfcomm::endpoint endp);
     51#endif
     52       
    3853        endpoint_set&  local;
    39         tcpip* tcp;
     54        std::vector< boost::shared_ptr<tcpip> > tcps;
    4055#ifdef HAVE_LIBBLUETOOTH
    41         rfcomm* rfc;
     56        std::vector< boost::shared_ptr<rfcomm_transport> > rfcomms;
    4257#endif
    4358};
  • source/ariba/utility/transport/transport_protocol.hpp

    r5993 r10653  
    33
    44#include "ariba/utility/addressing/addressing.hpp"
    5 #include "transport_listener.hpp"
     5#include "ariba/utility/transport/transport_listener.hpp"
     6#include "ariba/utility/transport/messages/message.hpp"
    67
    78// namespace ariba::transport
     
    1819class transport_protocol {
    1920public:
     21    /// Allow deleting implementing classes by pointer
     22    virtual ~transport_protocol() {}
     23   
    2024        virtual void start() = 0;
    2125        virtual void stop() = 0;
    22         virtual void send( const address_v* remote, const uint8_t* data, size_t size ) = 0;
    23         virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) = 0;
     26       
     27        virtual void send(
     28                const endpoint_set& endpoints,
     29                reboost::message_t message,
     30                uint8_t priority = 0) = 0;
     31       
     32        /// @deprecated: Use terminate() from transport_connection instead
    2433        virtual void terminate( const address_v* remote ) = 0;
     34       
    2535        virtual void register_listener( transport_listener* listener ) = 0;
    2636};
  • source/ariba/utility/visual/DddVis.h

    r6954 r10653  
    5959using std::pair;
    6060using std::make_pair;
    61 using std::cout;
    6261using std::ostringstream;
    6362using ariba::utility::NodeID;
  • source/ariba/utility/visual/OvlVis.h

    r6822 r10653  
    5454using std::pair;
    5555using std::make_pair;
    56 using std::cout;
    5756using std::ostringstream;
    5857using ariba::utility::KeyMapping;
Note: See TracChangeset for help on using the changeset viewer.