Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (11 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/communication/BaseCommunication.h

    r10653 r12060  
    5050#include <boost/foreach.hpp>
    5151
     52#ifdef ECLIPSE_PARSER
     53    #define foreach(a, b) for(a : b)
     54#else
     55    #define foreach(a, b) BOOST_FOREACH(a, b)
     56#endif
     57
    5258// utilities
    5359#include "ariba/utility/types.h"
    54 #include "ariba/utility/messages.h"
     60#include "ariba/utility/messages/MessageReceiver.h"
    5561#include "ariba/utility/logging/Logging.h"
    5662#include "ariba/utility/misc/Demultiplexer.hpp"
     
    5864
    5965// new transport and addressing
    60 #include "ariba/utility/addressing/addressing.hpp"
    61 #include "ariba/utility/transport/transport.hpp"
    62 #include "ariba/utility/transport/transport_connection.hpp"
     66#include "ariba/utility/transport/transport_peer.hpp"
     67#include "ariba/utility/transport/interfaces/transport_connection.hpp"
     68#include "ariba/utility/transport/interfaces/transport_listener.hpp"
     69#include "ariba/utility/addressing2/endpoint.hpp"
    6370
    6471// communication
     
    7279#include "ariba/communication/networkinfo/NetworkInformation.h"
    7380
    74 // disabled
    75 //#ifndef UNDERLAY_OMNET
    76 //  #include "ariba/communication/modules/transport/tcp/TCPTransport.h"
    77 //  #include "ariba/communication/modules/network/ip/IPv4NetworkProtocol.h"
    78 //  using ariba::communication::IPv4NetworkProtocol;
    79 //  using ariba::communication::TCPTransport;
    80 //#endif
    81 
    8281namespace ariba {
    83   class SideportListener;
     82    class SideportListener;
    8483}
    8584
     
    8786namespace communication {
    8887
     88
     89class communication_message_not_sent: public std::runtime_error
     90{
     91public:
     92    /** Takes a character string describing the error.  */
     93    explicit communication_message_not_sent(const string& __arg)  :
     94        std::runtime_error(__arg)
     95    {
     96    }
     97   
     98    virtual ~communication_message_not_sent() throw() {}
     99};
     100
     101
     102
    89103using namespace std;
    90 using namespace ariba::addressing;
    91104using namespace ariba::transport;
    92105using namespace ariba::utility;
     
    102115 * protocols and addressing schemes.
    103116 *
    104  * @author Sebastian Mies, Christoph Mayer
     117 * @author Sebastian Mies, Christoph Mayer, Mario Hock
    105118 */
    106119class BaseCommunication:
    107120        public NetworkChangeInterface,
    108         public SystemEventListener,
    109121        public transport_listener {
    110122
     
    120132
    121133        /// Startup the base communication, start modules etc.
    122         void start();
     134        void start(addressing2::EndpointSetPtr listen_on);
    123135
    124136        /// Stops the base communication, stop modules etc.
    125137        void stop();
    126 
    127         /// Sets the endpoints
    128         void setEndpoints( string& endpoints );
    129138
    130139        /// Check whether the base communication has been started up
     
    147156         * @return A sequence number for this message
    148157         */
    149         seqnum_t sendMessage(const LinkID lid, const Message* message);
     158        seqnum_t sendMessage(const LinkID& lid,
     159                reboost::message_t message,
     160                uint8_t priority,
     161                bool bypass_overlay = false) throw(communication_message_not_sent);
    150162
    151163        /**
     
    164176         * @return List of LinkID
    165177         */
    166         LinkIDs getLocalLinks(const address_v* addr) const;
     178//      LinkIDs getLocalLinks(const address_v* addr) const;  // XXX aktuell
    167179
    168180        /**
     
    187199
    188200        void unregisterEventListener(CommunicationEvents* _events);
    189 
    190         /// called when a system event is emitted by system queue
    191         virtual void handleSystemEvent(const SystemEvent& event);
    192201
    193202        /**
     
    196205         */
    197206        virtual void receive_message(transport_connection::sptr connection,
    198                 reboost::message_t msg);
    199 
     207                reboost::shared_buffer_t msg);
     208
     209    /**
     210     * called within the ASIO thread
     211     * when a connection is terminated (e.g. TCP close)
     212     */
     213    virtual void connection_terminated(transport_connection::sptr connection);
     214
     215    addressing2::EndpointPtr get_local_endpoint_of_link(const LinkID& linkid);
     216    addressing2::EndpointPtr get_remote_endpoint_of_link(const LinkID& linkid);
     217
     218       
    200219protected:
    201220
     
    205224         */
    206225        void receiveMessage(transport_connection::sptr connection,
    207                 reboost::message_t msg);
     226                reboost::shared_buffer_t message);
     227       
     228    /**
     229     * called within the ARIBA thread (System Queue)
     230     * when a connection is terminated (e.g. TCP close)
     231     */
     232    void connectionTerminated(transport_connection::sptr connection);
     233       
    208234
    209235        /// called when a network interface change happens
     
    221247                /// default constructor
    222248                LinkDescriptor() :
    223                         localLink(LinkID::UNSPECIFIED), localLocator(NULL),
    224                         remoteLink(LinkID::UNSPECIFIED), remoteLocator(NULL),
     249                        localLink(LinkID::UNSPECIFIED),
     250                        remoteLink(LinkID::UNSPECIFIED),
    225251                        up(false) {
    226252                }
    227253
    228                 ~LinkDescriptor() {
    229                         if (localLocator!=NULL)  delete localLocator;
    230                         if (remoteLocator!=NULL) delete remoteLocator;
     254                ~LinkDescriptor()
     255                {
     256                        if ( connection )
     257                        {
     258                            connection->unregister_communication_link(&localLink);
     259                        }
    231260                }
    232261
     
    240269                        return *unspec;
    241270                }
     271               
     272               
     273                transport_connection::sptr get_connection() const
     274                {
     275                    return connection;
     276                }
     277               
     278                void set_connection(const transport_connection::sptr& conn)
     279                {
     280                    // unregister from old connection,
     281                    // if any (but normally there shouldn't..)
     282                    if ( connection )
     283                    {
     284                        connection->unregister_communication_link(&localLink);
     285                    }
     286
     287                    // * set_connection *
     288                    connection = conn;
     289                   
     290                    // register this link with the connection
     291                    conn->register_communication_link(&localLink);
     292                }
    242293
    243294                bool unspecified;
     
    245296                /// link identifiers
    246297                LinkID localLink;
    247                 const address_v* localLocator;
     298                addressing2::EndpointPtr localLocator;
    248299
    249300                /// used underlay addresses for the link
    250301                LinkID remoteLink;
    251                 const address_v* remoteLocator;
     302                addressing2::EndpointPtr remoteLocator;
    252303
    253304                /// the remote end-point descriptor
    254                 EndpointDescriptor remoteEndpoint;
     305                EndpointDescriptor remoteDescriptor;
    255306
    256307                /// flag, whether this link is up
    257308                bool up;
    258309               
     310               
     311        private:
    259312                /// connection if link is up
    260313                transport_connection::sptr connection;
     
    281334        /// The local end-point descriptor
    282335        EndpointDescriptor localDescriptor;
    283 
    284 #ifndef UNDERLAY_OMNET
     336       
     337        /**
     338         * endpoint_set holding the addresses of the "server"-sockets,
     339         * ---> that should be opened
     340         *
     341         * (e.g. 0.0.0.0:41322)
     342         */
     343        addressing2::EndpointSetPtr listenOn_endpoints;
     344       
     345    /**
     346     * endpoint_set holding the addresses of the "server"-sockets,
     347     * ---> that are actually open
     348     *
     349     * (e.g. 0.0.0.0:41322)
     350     *
     351     * XXX should only be in transport_peer
     352     */
     353        addressing2::EndpointSetPtr active_listenOn_endpoints;
     354       
     355    /**
     356     * endpoint_set holding the addresses of the "server"-sockets,
     357     * ---> here the discovered "addressable" addresses are stored
     358     *
     359     * (e.g. 192.168.0.5:41322)
     360     *
     361     * XXX should only be in localDescriptor
     362     */
     363        addressing2::EndpointSetPtr local_endpoints;
     364
    285365        /// network change detector
    286366        NetworkChangeDetection networkMonitor;
    287 #endif
    288367
    289368        /// list of all remote addresses of links to end-points
    290         class endpoint_reference {
    291         public:
    292                 int count; ///< the number of open links to this end-point
    293                 const address_v* endpoint; ///< the end-point itself
    294         };
    295         vector<endpoint_reference> remote_endpoints;
    296 
    297         /// adds an end-point to the list
    298         void add_endpoint( const address_v* endpoint );
    299 
    300         /// removes an end-point from the list
    301         void remove_endpoint( const address_v* endpoint );
     369        // XXX DEPRECATED
     370//      class endpoint_reference {
     371//      public:
     372//              int count; ///< the number of open links to this end-point
     373//              const address_v* endpoint; ///< the end-point itself
     374//      };
     375//      vector<endpoint_reference> remote_endpoints;
     376
     377        // XXX DEPRECATED
     378//      /// adds an end-point to the list
     379//      void add_endpoint( const address_v* endpoint );
     380//
     381//      /// removes an end-point from the list
     382//      void remove_endpoint( const address_v* endpoint );
    302383
    303384        /// event listener
     
    314395        MessageReceiver* messageReceiver;
    315396
    316         /// convenience: send message to peer
    317         void send( Message* message, const EndpointDescriptor& endpoint );
    318         void send( Message* message, const LinkDescriptor& descriptor );
     397       
     398        /*
     399         * Sends a message over an existing link.
     400         *   ---> Adds »Link Message« Header
     401         */
     402        bool send_over_link(
     403                const uint8_t type,
     404                reboost::message_t message,
     405                const LinkDescriptor& desc,
     406                const uint8_t priority);
     407
     408    /*
     409     * Sends a message to a known peer. (To all known endpoints.)
     410     *   ---> Adds »Peer Message« Header
     411     */
     412        void send_to_peer(
     413                const uint8_t type,
     414                const PeerID& peer_id,
     415                reboost::message_t message,
     416                const EndpointDescriptor& endpoint,
     417                const uint8_t priority );
     418       
    319419
    320420        /// state of the base communication
Note: See TracChangeset for help on using the changeset viewer.