Changeset 12060 for source/ariba/utility


Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (11 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

Location:
source/ariba/utility
Files:
16 added
9 deleted
33 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/CMakeLists.txt

    r10700 r12060  
    4545add_subdir_sources(
    4646    addressing
     47    addressing2
    4748    bootstrap
    4849    configuration
    4950    internal
    5051    logging
    51     measurement
    5252    messages
    5353    misc
  • source/ariba/utility/addressing

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/addressing/endpoint_set.hpp

    r10653 r12060  
    1 #ifndef ENDPOINT_SET_HPP_
    2 #define ENDPOINT_SET_HPP_
     1#ifndef ENDPOINT_SET_HPP_DEPRECATED_
     2#define ENDPOINT_SET_HPP_DEPRECATED_
    33
    44#include "addressing.hpp"
  • source/ariba/utility/addressing/facades/address_v.hpp

    r10789 r12060  
    1313
    1414#include "../detail/address_convenience.hpp"
     15
     16#include <boost/shared_ptr.hpp>
    1517
    1618namespace ariba {
     
    2628class address_v: public detail::address_convenience<address_v> {
    2729public:
     30    typedef boost::shared_ptr<address_v> shared_ptr;
     31   
    2832        virtual ~address_v() {}
    2933       
  • source/ariba/utility/addressing/ip_address.hpp

    r6919 r12060  
    1 #ifndef IP_ADDRESS_HPP_
    2 #define IP_ADDRESS_HPP_
     1#ifndef IP_ADDRESS_HPP_DEPRECATED_
     2#define IP_ADDRESS_HPP_DEPRECATED_
    33
    44#include <string>
  • source/ariba/utility/addressing/tcpip_endpoint.hpp

    r5284 r12060  
    1 #ifndef TCPIP_ENDPOINT_HPP_
    2 #define TCPIP_ENDPOINT_HPP_
     1#ifndef TCPIP_ENDPOINT_HPP_DEPRECATED_
     2#define TCPIP_ENDPOINT_HPP_DEPRECATED_
    33
    44#include<string>
  • source/ariba/utility/bootstrap/modules/bluetoothsdp

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/bootstrap/modules/periodicbroadcast

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/bootstrap/modules/periodicbroadcast/PeriodicBroadcast.h

    r10653 r12060  
    4040#define __PERIODIC_BROADCAST_H
    4141
     42// ariba
    4243#include "ariba/config.h"
    43 
     44#include "ariba/utility/bootstrap/modules/BootstrapModule.h"
     45#include "ariba/utility/logging/Logging.h"
     46#include "ariba/utility/system/Timer.h"
     47
     48// ariba messages
     49#include "PeriodicBroadcastMessage.h"
     50
     51// (ariba) link-local
     52#include "ariba/utility/transport/StreamTransport/StreamTransport.hpp"
     53
     54// system
    4455#include <map>
    4556#include <string>
    4657#include <ctime>
    4758#include <iostream>
     59
     60// boost
    4861#include <boost/asio.hpp>
    4962#include <boost/foreach.hpp>
    5063#include <boost/thread/mutex.hpp>
    5164#include <boost/thread/thread.hpp>
    52 #include "ariba/utility/bootstrap/modules/BootstrapModule.h"
    53 #include "ariba/utility/logging/Logging.h"
    54 #include "ariba/utility/system/Timer.h"
    55 #include "PeriodicBroadcastMessage.h"
    56 
    57 //link-local
    58 #include "ariba/utility/transport/tcpip/tcpip.hpp"
     65
    5966
    6067using std::map;
     
    301308                               
    302309                                // include all link-local interfaces
    303                                 vector<uint64_t> scope_ids = ariba::transport::tcpip::get_interface_scope_ids();
     310                                vector<uint64_t> scope_ids = ariba::transport::get_interface_scope_ids();
    304311                               
    305312                                BOOST_FOREACH ( uint64_t id, scope_ids )
  • source/ariba/utility/logging/Logging.h

    r10700 r12060  
    9797
    9898  static int __loglevel__ = 2; //default is info
     99//  static int __loglevel__ = 1; // XXX use higher log level
    99100
    100101  #define logging_trace(x)  {                                   logging_stdout(x);                }
  • source/ariba/utility/messages.h

    r3690 r12060  
    4040#define MESSAGES_H_
    4141
     42// TODO wÃŒrde sagen das brauchen wir nicht mehr
     43//   ---> ÃŒberall dieses include rausnehmen und ggf  #include "messages/Message.h"   einfÃŒgen..
     44
    4245#include "messages/Message.h"
    43 #include "messages/MessageSender.h"
    44 #include "messages/MessageReceiver.h"
    45 #include "messages/MessageUtilities.h"
    46 #include "messages/MessageProvider.h"
    47 #include "messages/TextMessage.h"
     46//#include "messages/MessageSender.h"  // TODO wird das noch genutzt..? Wenn nein, sollte es weg!
     47//#include "messages/MessageReceiver.h"
     48//#include "messages/MessageUtilities.h"
     49////#include "messages/MessageProvider.h"  // DEPRECATED
     50//#include "messages/TextMessage.h"
    4851
    4952#endif /* MESSAGES_H_ */
  • source/ariba/utility/messages/Message.cpp

    r8620 r12060  
    4141#include "ariba/utility/serialization/DataStream.hpp"
    4242
     43#include "ariba/utility/logging/Logging.h"
     44
    4345NAMESPACE_BEGIN
    4446
     
    8082}
    8183
     84
     85reboost::message_t Message::wrap_up_for_sending()
     86{
     87    assert( ! wrapped_up );
     88    wrapped_up = true;
     89   
     90    //// Adapt to new message system ////
     91    Data data = data_serialize(this, DEFAULT_V);
     92    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     93
     94    newstyle_payload.push_front(buf);
     95   
     96    return newstyle_payload;
     97}
     98
     99reboost::shared_buffer_t Message::serialize_into_shared_buffer()
     100{
     101    assert ( newstyle_payload.length() == 0 );
     102   
     103    //// Adapt to new message system ////
     104    Data data = data_serialize(this, DEFAULT_V);
     105    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     106   
     107    return buf;
     108}
     109
     110
     111reboost::shared_buffer_t Message::deserialize_from_shared_buffer(reboost::shared_buffer_t buff)
     112{
     113    // NOTE: legacy payload is not allowed when using shared buffers
     114    this->legacy_payload_disabled = true;
     115   
     116    assert( buff.size() > 0 );
     117   
     118    // const_cast is necessary here, but without legacy payload we should be save here (more or less)
     119    Data dat(const_cast<uint8_t*>(buff.data()), buff.size() * 8);
     120   
     121    size_t len = this->SERIALIZATION_METHOD_NAME(DESERIALIZE, dat) / 8;
     122
     123    // return remaining sub-buffer
     124    return buff(len);
     125}
     126
     127
     128
    82129NAMESPACE_END
    83130
    84131std::ostream& operator<<(std::ostream& stream, const ariba::utility::Message& msg ) {
    85132        using_serialization;
    86         stream << "msg(type=" << typeid(msg).name();
     133        stream << "msg(type=" << typeid(msg).name() << ",";
    87134        stream << "len=" << (data_length(&msg)/8) << ",";
    88135        Data data = data_serialize(&msg);
    89         stream << ",data=" << data;
     136        stream << "data=" << data;
    90137        data.release();
    91138        stream << ")";
    92139        return stream;
    93140}
    94 
  • source/ariba/utility/messages/Message.h

    r6919 r12060  
    6262#include "ariba/utility/serialization.h"
    6363
     64// reboost messages
     65#include "ariba/utility/transport/messages/message.hpp"
     66
     67
    6468std::ostream& operator<<(std::ostream& stream, const ariba::utility::Message& msg );
    6569
     
    8286        friend std::ostream& ::operator<<(std::ostream& stream, const ariba::utility::Message& msg );
    8387
    84         // root binary data
    85         shared_array<uint8_t> root;
    86 
    8788        // payload
     89        bool legacy_payload_disabled;
    8890        bool releasePayload;
    8991        Data payload; //< messages binary data
     92       
     93        // XXX testing...
     94        reboost::message_t newstyle_payload;
     95        bool wrapped_up;
    9096
    9197        // addresses and control info
     
    98104         */
    99105        inline Message() :
    100                 root(), releasePayload(true), payload(), srcAddr(NULL),destAddr(NULL) {
     106            legacy_payload_disabled(false), releasePayload(true), payload(),
     107            newstyle_payload(), wrapped_up(false), srcAddr(NULL),destAddr(NULL) {
    101108        }
    102109
     
    106113         */
    107114        explicit inline Message( const Data& data ) :
    108                 releasePayload(true), srcAddr(NULL),destAddr(NULL) {
     115        legacy_payload_disabled(false), releasePayload(true),
     116        newstyle_payload(), wrapped_up(false), srcAddr(NULL),destAddr(NULL) {  // FIXME newstyle_payload..?
    109117                this->payload = data.clone();
    110118//              this->root = shared_array<uint8_t>((uint8_t*)data.getBuffer());
     
    225233                return decapsulate<T>();
    226234        }
     235       
     236       
     237        // XXX testing
     238        void set_payload_message(reboost::message_t msg)
     239        {
     240            newstyle_payload = msg;
     241        }
     242       
     243        void append_buffer(reboost::shared_buffer_t buff)
     244        {
     245            newstyle_payload.push_back(buff);
     246        }
     247       
     248       
     249        // XXX testing... packs this message into the payload message (do not use twice!!)
     250        virtual reboost::message_t wrap_up_for_sending();
     251       
     252       
     253        /**
     254         * Uses the old serialization system to serialize itself into a (new style) shared buffer.
     255         */
     256        virtual reboost::shared_buffer_t serialize_into_shared_buffer();
     257       
     258        /*
     259         * XXX experimental
     260         *
     261         * Uses the old serialization system to deserialize itself out of a (new style) shared buffer.
     262         * @return remaining sub-buffer (the "payload")
     263         *
     264         * Note: This is some kind of a hack! handle with care.
     265         */
     266        virtual reboost::shared_buffer_t deserialize_from_shared_buffer(reboost::shared_buffer_t buff);
     267       
    227268
    228269protected:
     
    262303         * @return A explicit payload serializer
    263304         */
    264         finline PayloadSerializer Payload( size_t length = ~0 ) {
     305        finline PayloadSerializer Payload( size_t length = ~0 )
     306        {
     307//          assert( ! legacy_payload_disabled );  // FIXME aktuell
     308           
    265309                return PayloadSerializer( this, length );
    266310        }
  • source/ariba/utility/messages/MessageProvider.cpp

    r6919 r12060  
    3737// [License]
    3838
    39 #include "MessageProvider.h"
    40 
    41 NAMESPACE_BEGIN
    42 
    43 MessageProvider::MessageProvider() {
    44 }
    45 
    46 MessageProvider::~MessageProvider() {
    47 }
    48 
    49 bool MessageProvider::sendMessageToReceivers( const Message* message ) {
    50         bool sent =  false;
    51         for (size_t i=0; i<receivers.size(); i++)
    52                 if (receivers[i]->receiveMessage(message, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED)) sent = true;
    53         return sent;
    54 }
    55 
    56 void MessageProvider::addMessageReceiver( MessageReceiver* receiver ) {
    57         receivers.push_back(receiver);
    58 }
    59 
    60 void MessageProvider::removeMessageReceiver( MessageReceiver* receiver ) {
    61         for (size_t i=0; i<receivers.size(); i++)
    62                 if (receivers[i]==receiver) {
    63                         receivers.erase( receivers.begin()+i );
    64                         break;
    65                 }
    66 }
    67 
    68 NAMESPACE_END
     39//#include "MessageProvider.h"
     40//
     41//NAMESPACE_BEGIN
     42//
     43//MessageProvider::MessageProvider() {
     44//}
     45//
     46//MessageProvider::~MessageProvider() {
     47//}
     48//
     49//bool MessageProvider::sendMessageToReceivers( const Message* message ) {
     50//      bool sent =  false;
     51//      for (size_t i=0; i<receivers.size(); i++)
     52//              if (receivers[i]->receiveMessage(message, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED)) sent = true;
     53//      return sent;
     54//}
     55//
     56//void MessageProvider::addMessageReceiver( MessageReceiver* receiver ) {
     57//      receivers.push_back(receiver);
     58//}
     59//
     60//void MessageProvider::removeMessageReceiver( MessageReceiver* receiver ) {
     61//      for (size_t i=0; i<receivers.size(); i++)
     62//              if (receivers[i]==receiver) {
     63//                      receivers.erase( receivers.begin()+i );
     64//                      break;
     65//              }
     66//}
     67//
     68//NAMESPACE_END
  • source/ariba/utility/messages/MessageProvider.h

    r3690 r12060  
     1// XXX DEPRECATED
     2
    13// [License]
    24// The Ariba-Underlay Copyright
     
    3739// [License]
    3840
     41// XXX DEPRECATED
     42
    3943#ifndef MESSAGEPROVIDER_H_
    4044#define MESSAGEPROVIDER_H_
    4145
    42 #include "_namespace.h"
    43 #include "MessageReceiver.h"
    44 #include "ariba/utility/types/LinkID.h"
    45 #include "ariba/utility/types/NodeID.h"
    46 #include <vector>
    47 
    48 using std::vector;
    49 using ariba::utility::LinkID;
    50 using ariba::utility::NodeID;
    51 
    52 NAMESPACE_BEGIN
    53 
    54 
    55 /**
    56  * This class defines an interface for message providers.
    57  * Implementing classes must allow receivers to register themselves.
    58  *
    59  * @author Sebastian Mies
    60  */
    61 class MessageProvider {
    62 private:
    63         vector<MessageReceiver*> receivers;
    64 
    65 protected:
    66         bool sendMessageToReceivers( const Message* message );
    67 
    68 public:
    69         /**
    70          * Constructor.
    71          */
    72         MessageProvider();
    73 
    74         /**
    75          * Destructor.
    76          */
    77         ~MessageProvider();
    78 
    79         /**
    80          * Adds a message receiver.
    81          *
    82          * @param receiver The receiver.
    83          */
    84         void addMessageReceiver( MessageReceiver* receiver );
    85 
    86         /**
    87          * Removes a message receiver.
    88          *
    89          * @param receiver The receiver.
    90          */
    91         void removeMessageReceiver( MessageReceiver* receiver );
    92 };
    93 
    94 NAMESPACE_END
     46//#include "_namespace.h"
     47//#include "MessageReceiver.h"
     48//#include "ariba/utility/types/LinkID.h"
     49//#include "ariba/utility/types/NodeID.h"
     50//#include <vector>
     51//
     52//using std::vector;
     53//using ariba::utility::LinkID;
     54//using ariba::utility::NodeID;
     55//
     56//NAMESPACE_BEGIN
     57//
     58//
     59///**
     60// * This class defines an interface for message providers.
     61// * Implementing classes must allow receivers to register themselves.
     62// *
     63// * @author Sebastian Mies
     64// */
     65//class MessageProvider {
     66//private:
     67//      vector<MessageReceiver*> receivers;
     68//
     69//protected:
     70//      bool sendMessageToReceivers( const Message* message );
     71//
     72//public:
     73//      /**
     74//       * Constructor.
     75//       */
     76//      MessageProvider();
     77//
     78//      /**
     79//       * Destructor.
     80//       */
     81//      ~MessageProvider();
     82//
     83//      /**
     84//       * Adds a message receiver.
     85//       *
     86//       * @param receiver The receiver.
     87//       */
     88//      void addMessageReceiver( MessageReceiver* receiver );
     89//
     90//      /**
     91//       * Removes a message receiver.
     92//       *
     93//       * @param receiver The receiver.
     94//       */
     95//      void removeMessageReceiver( MessageReceiver* receiver );
     96//};
     97//
     98//NAMESPACE_END
    9599
    96100#endif /* MESSAGEPROVIDER_H_ */
  • source/ariba/utility/messages/MessageReceiver.cpp

    r3690 r12060  
    4949}
    5050
    51 bool MessageReceiver::receiveMessage( const Message* message, const LinkID& link, const NodeID& node ) {
    52         //std::cout << "UNIMPLEMENTED MessageReceiver got Message:" << (Message*)message << std::endl;
    53         return false;
    54 }
     51//bool MessageReceiver::receiveMessage( reboost::shared_buffer_t message, const LinkID& link, const NodeID& node ) {
     52//      //std::cout << "UNIMPLEMENTED MessageReceiver got Message:" << (Message*)message << std::endl;
     53//      return false;
     54//}
    5555
    5656NAMESPACE_END
  • source/ariba/utility/messages/MessageReceiver.h

    r3690 r12060  
    4040#define MESSAGERECEIVER_H__
    4141
    42 #include "ariba/utility/messages/Message.h"
     42//#include "ariba/utility/messages/Message.h"
     43// reboost messages
     44#include "ariba/utility/transport/messages/message.hpp"
    4345#include "ariba/utility/types/LinkID.h"
    4446#include "ariba/utility/types/NodeID.h"
     
    7375         * @return True, when the message has been accepted.
    7476         */
    75         virtual bool receiveMessage( const Message* message, const LinkID& link, const NodeID& node );
     77        virtual bool receiveMessage( reboost::shared_buffer_t message,
     78                const LinkID& link,
     79                const NodeID& node,
     80                bool bypass_overlay ) = 0;
    7681};
    7782
  • source/ariba/utility/misc/Demultiplexer.hpp

    r3705 r12060  
    136136
    137137                SERVICE_LISTENER_MAP_CITERATOR it = mapServiceListener.find( id );
    138                 if( it == mapServiceListener.end() )    return NULL;
     138                if( it == mapServiceListener.end() )    return 0;
    139139                else                                    return it->second;
    140140        }
  • source/ariba/utility/system/StartupWrapper.cpp

    r10700 r12060  
    3636// Telematics.
    3737// [License]
     38
     39// XXX NOTE: Use this class with caution! Config support is outdated.
    3840
    3941#include "StartupWrapper.h"
  • source/ariba/utility/system/StartupWrapper.h

    r4483 r12060  
    3636// Telematics.
    3737// [License]
     38
     39// XXX NOTE: Use this class with caution! Config support is outdated.
    3840
    3941#ifndef __STARTUP_WRAPPER_H
  • source/ariba/utility/system/SystemQueue.cpp

    r7533 r12060  
    6969}
    7070
     71// maps to function call internally to the Event-system
     72void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
     73{
     74    // copy function object
     75    boost::function0<void>* function_ptr = new boost::function0<void>();
     76    (*function_ptr) = function;
     77
     78    // schedule special call-event
     79    scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
     80
     81}
     82
    7183#ifdef UNDERLAY_OMNET
    7284void SystemQueue::handleMessage(cMessage* msg){
  • source/ariba/utility/system/SystemQueue.h

    r7468 r12060  
    6060#endif
    6161
     62#include <boost/function.hpp>
     63
     64
    6265using std::vector;
    6366using boost::posix_time::ptime;
     
    108111         */
    109112        void scheduleEvent( const SystemEvent& event, uint32_t delay = 0 );
     113       
     114        /**
     115         * This method schedules a function call in the SystemQueue.
     116         * (Like scheduleEvent, but to be used with boost::bind.)
     117         *
     118         * @param function: The function to be called [void function()]
     119         * @param The delay in milli-seconds
     120         */
     121        void scheduleCall( const boost::function0<void>& function, uint32_t delay = 0 );
    110122
    111123        /**
     
    170182#endif
    171183
     184       
     185       
    172186private:
    173187
     
    235249        volatile bool systemQueueRunning;
    236250#endif
    237 
     251       
     252       
     253private:
     254    /**
     255     * This inner class handles the function-call events.
     256     * @see SystemQueue::scheduleCall
     257     */
     258    class FunctionCaller  :  public SystemEventListener
     259    {
     260        void handleSystemEvent(const SystemEvent& event)
     261        {
     262            boost::function0<void>* function_ptr = event.getData< boost::function0<void> >();
     263            (*function_ptr)();
     264            delete function_ptr;
     265        }
     266    };
     267
     268    FunctionCaller internal_function_caller;
    238269}; // class SystemQueue
    239270
  • source/ariba/utility/transport

    • Property svn:mergeinfo changed (with no actual effect on merging)
  • source/ariba/utility/transport/CMakeLists.txt

    r10700 r12060  
    3838
    3939add_headers(
    40     test_transport.hpp
    41     transport_connection.hpp
    42     transport.hpp
    43     transport_listener.hpp
    4440    transport_peer.cpp
    4541    transport_peer.hpp
    46     transport_protocol.hpp
    4742    )
    4843
    49 add_subdir_sources(asio messages rfcomm tcpip)
     44add_subdir_sources(asio messages rfcomm StreamTransport)
     45
  • source/ariba/utility/transport/messages/message.cpp

    r10653 r12060  
    2424        os << "message({size=" << m.size() << ",buffers=" << (int) m.length()
    2525                        << ",hash=" << m.hash() << "},";
    26         m.foreach(ts);
     26        m.msg_foreach(ts);
    2727        os << ")";
    2828        return os;
  • source/ariba/utility/transport/messages/message.hpp

    r10653 r12060  
    1717
    1818/// message size type
    19 typedef signed char mlength_t;
     19//typedef signed char mlength_t;  // <--- don't do this!!
     20//typedef size_t mlength_t;
     21typedef int mlength_t;  // signed int seems necessary
    2022
    2123/// maximum number of buffers per message (default is 8)
    2224const mlength_t message_max_buffers = (1L << 3);
     25//const mlength_t message_max_buffers = (1L << 4);
    2326
    2427//! A Copy-on-Write Message with Shared Buffers.
     
    7073        /// Copy message
    7174        inline message_t(const message_t& msg) :
    72                 imsg(msg.imsg) {
    73                 imsg->owner = NULL;
     75                imsg(msg.imsg)
     76        {
     77            if ( imsg )
     78                imsg->owner = NULL;
    7479        }
    7580
     
    142147        /// Returns the number of buffers inside this message.
    143148        inline mlength_t length() const {
     149            if ( ! imsg )
     150                return 0;
     151           
    144152                return (imsg->length);
    145153        }
     
    167175        /// Iterates over a partial set of buffers.
    168176        template<typename T>
    169         inline void foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
     177        inline void msg_foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
    170178                T op = work;
    171179                if (size_ == 0) size_ = size() - index_;
     
    192200        inline void read(boctet_t* mem, size_t idx = 0, size_t size_ = 0) const {
    193201                struct read_buffer rb = { mem };
    194                 foreach(rb, idx, size_);
     202                msg_foreach(rb, idx, size_);
    195203        }
    196204
     
    198206        inline void write(const boctet_t* mem, size_t idx = 0, size_t size_ = 0) {
    199207                struct write_buffer wb = { mem };
    200                 foreach(wb, idx, size_);
     208                msg_foreach(wb, idx, size_);
    201209        }
    202210
     
    227235                message_t m;
    228236                struct sub_message sm = { &m };
    229                 foreach(sm, index, size);
     237                msg_foreach(sm, index, size);
    230238                return m;
    231239        }
  • source/ariba/utility/transport/messages/shared_buffer.hpp

    r10700 r12060  
    99
    1010#include <cstring>
     11#include <string>
    1112#include <boost/shared_ptr.hpp>
    1213
     
    1819#include "buffer.hpp"
    1920
     21#include <stdexcept>
     22
    2023namespace reboost {
     24
     25class illegal_sub_buffer: public std::runtime_error
     26{
     27public:
     28    /** Takes a character string describing the error.  */
     29    explicit illegal_sub_buffer(const std::string& __arg)  :
     30        std::runtime_error(__arg)
     31    {
     32    }
     33   
     34    virtual ~illegal_sub_buffer() throw() {}
     35};
    2136
    2237/**
     
    104119                parent(new deleteable_buffer(buffer, size))
    105120        {
     121        }
     122
     123//    /// XXX debug... copy!
     124//      /// create shared buffer from buffer
     125//      inline shared_buffer_t(const char* buffer, bsize_t size) :
     126//              buffer_t(), parent(new deleteable_buffer(size)) {
    106127//              memcpy(parent->mutable_data(), buffer, parent->size());
    107 //              data(parent->mutable_data());
    108 //              this->size(parent->size());
    109         }
     128//              data(parent->mutable_data()); this->size(parent->size());
     129//      }
    110130
    111131        /// clone data from a normal buffer
     
    129149
    130150        /// return sub-buffer.
    131         inline self operator()(bsize_t index, bsize_t size = 0) const {
     151        inline self operator()(bsize_t index, bsize_t size = 0) const
     152        {
     153            // special cases
     154            if ( index + size > size_ )
     155            {
     156                // empty sub-buffer
     157            if ( index == size_ )
     158            {
     159                self n;
     160                return n;
     161            }
     162         
     163            // ERROR: index out of bounds
     164            throw illegal_sub_buffer("Index or size out of bounds in shared_buffer");
     165            }
     166
     167            // regular case
    132168                self n(*this);
    133169                n.data_ += index;
  • source/ariba/utility/transport/rfcomm/CMakeLists.txt

    r10700 r12060  
    4040    bluetooth_endpoint.hpp
    4141    bluetooth_rfcomm.hpp
    42     rfcomm_transport.hpp
    4342    )
    4443
    45 add_sources(rfcomm_transport.cpp)
     44#add_sources()
     45
  • source/ariba/utility/transport/transport_peer.cpp

    r10700 r12060  
    1 
    2 #include "ariba/config.h"
    31#include "transport_peer.hpp"
    4 #include "transport.hpp"
    5 #include <boost/asio/ip/tcp.hpp>
     2
     3// ariba
     4#include "StreamTransport/StreamTransport.hpp"
     5#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
     6
     7// boost
    68#include <boost/asio/error.hpp>
    79#include <boost/foreach.hpp>
    8 
    9 #ifdef ECLIPSE_PARSER
    10     #define foreach(a, b) for(a : b)
    11 #else
    12     #define foreach(a, b) BOOST_FOREACH(a, b)
    13 #endif
    1410
    1511// namespace ariba::transport
     
    1713namespace transport {
    1814
    19 using namespace ariba::addressing;
     15using namespace addressing2;
    2016using boost::asio::ip::tcp;
    2117
     
    2622use_logging_cpp(transport_peer);
    2723
    28 transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) {
    29    
    30     // setup tcp transports
    31     foreach(tcp_port_address port, local.tcp) {
     24transport_peer::transport_peer()  :
     25        local(new addressing2::endpoint_set())
     26{
     27}
     28
     29EndpointSetPtr transport_peer::add_listenOn_endpoints(EndpointSetPtr endpoints)
     30{
     31    // TCP Endpoints
     32    BOOST_FOREACH( shared_ptr<tcpip_endpoint> endp, endpoints->get_tcpip_endpoints() )
     33    {
     34        // automatic port detection
     35        bool port_detection = false;
     36        uint16_t try_port = 41322;
    3237       
    33         if (local.ip.size() > 0) {
    34             foreach(ip_address ip_addr, local.ip) {
    35                
    36                 tcp::endpoint endp(ip_addr.asio(), port.asio());
    37                 create_service(endp);
    38             }
    39         } else {
    40             tcp::endpoint endp_v6(tcp::v6(), port.asio());
    41             tcp::endpoint endp_v4(tcp::v4(), port.asio());
    42            
    43             create_service(endp_v6);
    44             create_service(endp_v4);
     38        tcp::endpoint asio_endp = endp->to_asio();
     39        if ( asio_endp.port() == 0 )
     40        {
     41            port_detection = true;
    4542        }
    4643       
    47     }
    48    
     44       
     45        // create new server socket
     46        do
     47        {
     48            try
     49            {
     50                // automatic port detection
     51                if ( port_detection )
     52                {
     53                    asio_endp.port(try_port);
     54                    endp = tcpip_endpoint::create_TcpIP_Endpoint(asio_endp);
     55                }
     56               
     57                TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp->to_asio()));
     58                transport_streams.push_back(tmp_ptr);
     59                logging_info("Listening on IP/TCP " << endp->to_string());
     60               
     61                local->add_endpoint(endp);
     62                port_detection = false;
     63            }
     64           
     65            catch (boost::system::system_error& e)
     66            {
     67                // address in use
     68                if (e.code() == boost::asio::error::address_in_use)
     69                {
     70                    // BRANCH: automatic port detection
     71                    if ( port_detection )
     72                    {
     73                        // give up ?
     74                        if ( try_port > 41422 )
     75                        {
     76                            logging_warn("[WARN] Unable to find free port. Giving up. :-( Last try was: "
     77                                << endp->to_string() << ". Endpoint will be ignored!");
     78   
     79                            port_detection = false;
     80                        }
     81                        else
     82                        {
     83                            // try next port
     84                            try_port++;
     85                        }
     86                    }
     87                    // BRANCH: explicit given port --> error
     88                    else
     89                    {
     90                        logging_warn("[WARN] Address already in use: "
     91                            << endp->to_string() << ". Endpoint will be ignored!");
     92                    }
     93                }
     94   
     95                // Rethrow
     96                else
     97                {
     98                    throw;
     99                }
     100            }
     101        } while ( port_detection );
     102    }
     103   
     104    // TODO Bluetooth Endpoints
    49105        #ifdef HAVE_LIBBLUETOOTH
    50     foreach(rfcomm_channel_address channel, local.rfcomm) {
    51         if (local.bluetooth.size() > 0) {
    52                 foreach(mac_address mac, local.bluetooth) {
    53                         rfcomm::endpoint endp(mac.bluetooth(), channel.value());
    54                         create_service(endp);
    55                 }
    56         } else {
    57                 rfcomm::endpoint endp(channel.value());
    58                 create_service(endp);
    59         }
    60     }
     106//    foreach(rfcomm_channel_address channel, local.rfcomm) {
     107//      if (local.bluetooth.size() > 0) {
     108//              foreach(mac_address mac, local.bluetooth) {
     109//                      rfcomm::endpoint endp(mac.bluetooth(), channel.value());
     110//                      create_service(endp);
     111//              }
     112//      } else {
     113//              rfcomm::endpoint endp(channel.value());
     114//              create_service(endp);
     115//      }
     116//    }
    61117        #endif
    62 }
    63 
    64 void transport_peer::create_service(tcp::endpoint endp) {
    65     try {
    66         TcpIpPtr tmp_ptr(new tcpip(endp));
    67         tcps.push_back(tmp_ptr);
    68         logging_info("Listening on IP/TCP " << endp);
    69        
    70     } catch (boost::system::system_error& e) {
    71         if (e.code() == boost::asio::error::address_in_use) {
    72             logging_warn("[WARN] Address already in use: "
    73                     << endp << ". Endpoint will be ignored!");
    74         } else {
    75             // Rethrow
    76             throw;
    77         }
    78     }
    79 }
     118   
     119    return local;
     120}
     121
     122//void transport_peer::create_service(tcp::endpoint endp) {
     123//    try {
     124//        TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp));
     125//        tcps.push_back(tmp_ptr);
     126//        logging_info("Listening on IP/TCP " << endp);
     127//       
     128//    } catch (boost::system::system_error& e) {
     129//        if (e.code() == boost::asio::error::address_in_use) {
     130//            logging_warn("[WARN] Address already in use: "
     131//                    << endp << ". Endpoint will be ignored!");
     132//        } else {
     133//            // Rethrow
     134//            throw;
     135//        }
     136//    }
     137//}
    80138
    81139#ifdef HAVE_LIBBLUETOOTH
    82 void transport_peer::create_service(rfcomm::endpoint endp) {
    83     try {
    84         rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp));
    85         rfcomms.push_back(tmp_ptr);
    86         logging_info("Listening on bluetooth/RFCOMM " << endp);
    87        
    88     } catch (boost::system::system_error& e) {
    89         if (e.code() == boost::asio::error::address_in_use) {
    90             logging_warn("[WARN] Address already in use: "
    91                     << endp << ". Endpoint will be ignored!");
    92         } else {
    93             // Rethrow
    94             throw;
    95         }
    96     }
    97 }
     140//void transport_peer::create_service(rfcomm::endpoint endp) {
     141//    try {
     142//        TransportProtocolPtr tmp_ptr(new StreamTransport<rfcomm>(endp));
     143//        rfcomms.push_back(tmp_ptr);
     144//        logging_info("Listening on bluetooth/RFCOMM " << endp);
     145//       
     146//    } catch (boost::system::system_error& e) {
     147//        if (e.code() == boost::asio::error::address_in_use) {
     148//            logging_warn("[WARN] Address already in use: "
     149//                    << endp << ". Endpoint will be ignored!");
     150//        } else {
     151//            // Rethrow
     152//            throw;
     153//        }
     154//    }
     155//}
    98156#endif
    99157
     
    101159}
    102160
    103 void transport_peer::start() {
    104     foreach(TcpIpPtr tcp, tcps) {
    105         tcp->start();
    106     }
    107    
    108 #ifdef HAVE_LIBBLUETOOTH
    109     foreach(rfcomm_transport::sptr x, rfcomms) {
    110         x->start();
    111     }
    112 #endif
    113 }
    114 
    115 void transport_peer::stop() {
    116     foreach(TcpIpPtr tcp, tcps) {
    117         tcp->stop();
    118     }
    119    
    120 #ifdef HAVE_LIBBLUETOOTH
    121         foreach(rfcomm_transport::sptr x, rfcomms) {
    122                 x->stop();
    123         }
    124 #endif
     161void transport_peer::start()
     162{
     163    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     164    {
     165        stream->start();
     166    }
     167}
     168
     169void transport_peer::stop()
     170{
     171    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     172    {
     173        stream->stop();
     174    }
    125175}
    126176
    127177
    128178void transport_peer::send(
    129         const endpoint_set& endpoints,
     179        const const_EndpointSetPtr endpoints,
    130180        reboost::message_t message,
    131181        uint8_t priority)
    132182{
    133     foreach(TcpIpPtr tcp, tcps) {
    134         tcp->send(endpoints, message, priority);
    135     }
    136    
    137 #ifdef HAVE_LIBBLUETOOTH
    138     foreach(rfcomm_transport::sptr x, rfcomms) {
    139                 x->send(endpoints, message, priority);
    140         }
    141 #endif
    142 }
    143 
    144 void transport_peer::terminate( const address_v* remote ) {
    145         if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
    146         {
    147             foreach(TcpIpPtr tcp, tcps) {
    148                 tcp->terminate(remote);
    149             }
    150         }
    151 #ifdef HAVE_LIBBLUETOOTH
    152         if (remote->instanceof<rfcomm_endpoint>()) {
    153                 foreach(rfcomm_transport::sptr x, rfcomms) {
    154                         x->terminate(remote);
    155                 }
    156         }
    157 #endif
    158 }
    159 
    160 void transport_peer::register_listener( transport_listener* listener ) {
    161     foreach(TcpIpPtr tcp, tcps) {
    162         tcp->register_listener(listener);
    163     }
    164    
    165 #ifdef HAVE_LIBBLUETOOTH
    166     foreach(rfcomm_transport::sptr x, rfcomms) {
    167         x->register_listener(listener);
    168     }
    169 #endif
     183    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     184    {
     185        stream->send(endpoints, message, priority);
     186    }
     187}
     188
     189// XXX DEPRECATED
     190//void transport_peer::terminate( const address_v* remote ) {
     191//      if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung
     192//      {
     193//          foreach(TransportProtocolPtr tcp, tcps) {
     194//              tcp->terminate(remote);
     195//          }
     196//      }
     197//#ifdef HAVE_LIBBLUETOOTH
     198//      if (remote->instanceof<rfcomm_endpoint>()) {
     199//              foreach(TransportProtocolPtr x, rfcomms) {
     200//                      x->terminate(remote);
     201//              }
     202//      }
     203//#endif
     204//}
     205
     206void transport_peer::register_listener( transport_listener* listener )
     207{
     208    BOOST_FOREACH(TransportProtocolPtr stream, transport_streams)
     209    {
     210        stream->register_listener(listener);
     211    }
    170212}
    171213
  • source/ariba/utility/transport/transport_peer.hpp

    r10700 r12060  
    22#define TRANSPORT_PEER_HPP_
    33
     4// ariba
    45#include "ariba/config.h"
    56#include "ariba/utility/logging/Logging.h"
    6 #include "transport_protocol.hpp"
    7 #include "ariba/utility/addressing/endpoint_set.hpp"
     7#include "ariba/utility/addressing2/endpoint_set.hpp"
     8
     9// ariba interfaces
     10#include "interfaces/transport_protocol.hpp"
     11
     12// boost
    813#include <boost/shared_ptr.hpp>
    9 #include "rfcomm/bluetooth_rfcomm.hpp"
     14
     15// boost-adaption
     16//#include "rfcomm/bluetooth_rfcomm.hpp"
    1017
    1118
     
    1421namespace transport {
    1522
    16 using namespace ariba::addressing;
    17 
    18 class tcpip;
    19 
    20 #ifdef HAVE_LIBBLUETOOTH
    21 class rfcomm_transport;
    22 #endif
    23 
    2423/**
    25  * TODO: Doc
     24 * This class allocates implementations of various transport
     25 * protocols and can send messages to an entire set of endpoints
    2626 *
    27  * @author Sebastian Mies <mies@tm.uka.de>
     27 * @author Sebastian Mies <mies@tm.uka.de>, Mario Hock
    2828 */
    29 /// this transport peer allocates implementations of various transport
    30 /// protocols and can send messages to an entire set of endpoints
    31 class transport_peer : public transport_protocol {
     29class transport_peer :
     30    public transport_protocol
     31{
    3232        use_logging_h(transport_peer);
     33        typedef boost::shared_ptr<transport_protocol> TransportProtocolPtr;
     34       
    3335public:
    34         transport_peer( endpoint_set& local_set );
     36        transport_peer();
     37       
     38        /**
     39         * Adds endpoints on which ariba should listen ("server"-sockets)
     40         *
     41         * @return An endpoint_set holding all active endpoints ariba is listening on.   
     42         */
     43        addressing2::EndpointSetPtr add_listenOn_endpoints(addressing2::EndpointSetPtr endpoints);
     44       
    3545        virtual ~transport_peer();
    3646        virtual void start();
     
    3848       
    3949        virtual void send(
    40                 const endpoint_set& endpoints,
     50                const addressing2::const_EndpointSetPtr endpoints,
    4151                reboost::message_t message,
    42                 uint8_t priority = 0);
    43        
    44         /// @deprecated: Use terminate() from transport_connection instead
    45         virtual void terminate( const address_v* remote );
    46        
     52                uint8_t priority = system_priority::OVERLAY);
     53               
    4754        virtual void register_listener( transport_listener* listener );
    4855
     56       
    4957private:
    50         void create_service(tcp::endpoint endp);
    51 #ifdef HAVE_LIBBLUETOOTH
    52         void create_service(boost::asio::bluetooth::rfcomm::endpoint endp);
    53 #endif
    54        
    55         endpoint_set&  local;
    56         std::vector< boost::shared_ptr<tcpip> > tcps;
    57 #ifdef HAVE_LIBBLUETOOTH
    58         std::vector< boost::shared_ptr<rfcomm_transport> > rfcomms;
    59 #endif
     58        addressing2::EndpointSetPtr local;
     59        std::vector<TransportProtocolPtr> transport_streams;
    6060};
    6161
  • source/ariba/utility/types/Identifier.h

    r4625 r12060  
    5050#include "ariba/utility/serialization.h"
    5151
     52// XXX EXPERIMENTAL
     53#include "ariba/utility/transport/messages/shared_buffer.hpp"
     54
    5255/**< maximum length of the key */
    5356#define MAX_KEYLENGTH 192
     
    6972        use_logging_h( Identifier );
    7073public:
     74       
     75        // XXX EXPERIMENTAL
     76        reboost::shared_buffer_t serialize() const
     77        {
     78            Data data = data_serialize(this, DEFAULT_V);
     79            reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     80           
     81            return buf;
     82        }
     83       
     84        reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff)
     85        {
     86            Data dat(const_cast<uint8_t*>(buff.data()), buff.size() * 8);
     87           
     88            size_t len = this->SERIALIZATION_METHOD_NAME(DESERIALIZE, dat) / 8;
     89
     90            // return remaining sub-buffer
     91            return buff(len);
     92        }
     93       
    7194
    7295        //-------------------------------------------------------------------------
  • source/ariba/utility/types/OverlayParameterSet.h

    r4625 r12060  
    5353
    5454        typedef enum _OverlayStructure {
    55                 OverlayStructureOneHop   = 0,
     55                OverlayStructureOneHop   = 0,   // DEPRECATED, DO NOT USE
    5656                OverlayStructureChord    = 1,
    5757        } OverlayStructure;
  • source/ariba/utility/types/ServiceID.cpp

    r3690 r12060  
    7676}
    7777
    78 string ServiceID::toString() const {
     78string ServiceID::toString() const
     79{
     80    if ( *this == ServiceID::UNSPECIFIED )
     81        return "UNSPEC";
     82   
    7983        return ariba::utility::Helper::ultos( id );
    8084}
Note: See TracChangeset for help on using the changeset viewer.