An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/communication/BaseCommunication.h @ 12060

Last change on this file since 12060 was 12060, checked in by hock@…, 9 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue? can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery? discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg?)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers? (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 11.7 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#ifndef BASECOMMUNICATION_H_
40#define BASECOMMUNICATION_H_
41
42// boost & std includes
43#include <boost/unordered_map.hpp>
44#include <boost/unordered_set.hpp>
45#include <map>
46#include <set>
47#include <vector>
48#include <iostream>
49#include <algorithm>
50#include <boost/foreach.hpp>
51
52#ifdef ECLIPSE_PARSER
53    #define foreach(a, b) for(a : b)
54#else
55    #define foreach(a, b) BOOST_FOREACH(a, b)
56#endif
57
58// utilities
59#include "ariba/utility/types.h"
60#include "ariba/utility/messages/MessageReceiver.h"
61#include "ariba/utility/logging/Logging.h"
62#include "ariba/utility/misc/Demultiplexer.hpp"
63#include "ariba/utility/system/SystemEventListener.h"
64
65// new transport and addressing
66#include "ariba/utility/transport/transport_peer.hpp"
67#include "ariba/utility/transport/interfaces/transport_connection.hpp"
68#include "ariba/utility/transport/interfaces/transport_listener.hpp"
69#include "ariba/utility/addressing2/endpoint.hpp"
70
71// communication
72#include "ariba/communication/CommunicationEvents.h"
73#include "ariba/communication/EndpointDescriptor.h"
74#include "ariba/communication/messages/AribaBaseMsg.h"
75
76// network changes
77#include "ariba/communication/networkinfo/NetworkChangeInterface.h"
78#include "ariba/communication/networkinfo/NetworkChangeDetection.h"
79#include "ariba/communication/networkinfo/NetworkInformation.h"
80
81namespace ariba {
82    class SideportListener;
83}
84
85namespace ariba {
86namespace communication {
87
88
89class communication_message_not_sent: public std::runtime_error
90{
91public:
92    /** Takes a character string describing the error.  */
93    explicit communication_message_not_sent(const string& __arg)  :
94        std::runtime_error(__arg)
95    {
96    }
97   
98    virtual ~communication_message_not_sent() throw() {}
99};
100
101
102
103using namespace std;
104using namespace ariba::transport;
105using namespace ariba::utility;
106
107// use base ariba types (clarifies multiple definitions)
108using ariba::utility::Message;
109using ariba::utility::seqnum_t;
110
111/**
112 * This class implements the Ariba Base Communication<br />
113 *
114 * Its primary task is to provide an abstraction to existing
115 * protocols and addressing schemes.
116 *
117 * @author Sebastian Mies, Christoph Mayer, Mario Hock
118 */
119class BaseCommunication:
120        public NetworkChangeInterface,
121        public transport_listener {
122
123        use_logging_h(BaseCommunication);
124        friend class ariba::SideportListener;
125
126public:
127        /// Default ctor that just creates an non-functional base communication
128        BaseCommunication();
129
130        /// Default dtor that does nothing
131        virtual ~BaseCommunication();
132
133        /// Startup the base communication, start modules etc.
134        void start(addressing2::EndpointSetPtr listen_on);
135
136        /// Stops the base communication, stop modules etc.
137        void stop();
138
139        /// Check whether the base communication has been started up
140        bool isStarted();
141
142        /// Establishes a link to another end-point.
143        const LinkID establishLink(const EndpointDescriptor& descriptor,
144                const LinkID& linkid = LinkID::UNSPECIFIED, const QoSParameterSet& qos =
145                                QoSParameterSet::DEFAULT, const SecurityParameterSet& sec =
146                                SecurityParameterSet::DEFAULT);
147
148        /// Drops a link
149        void dropLink(const LinkID link);
150
151        /**
152         * Sends a message though an existing link to an end-point.
153         *
154         * @param lid The link identifier
155         * @param message The message to be sent
156         * @return A sequence number for this message
157         */
158        seqnum_t sendMessage(const LinkID& lid,
159                reboost::message_t message,
160                uint8_t priority,
161                bool bypass_overlay = false) throw(communication_message_not_sent);
162
163        /**
164         * Returns the end-point descriptor
165         *
166         * @param link the link id of the requested end-point
167         * @return The end-point descriptor of the link's end-point
168         */
169        const EndpointDescriptor& getEndpointDescriptor(const LinkID link =
170                        LinkID::UNSPECIFIED) const;
171
172        /**
173         * Get local links to the given endpoint of all local link
174         * using the default parameter EndpointDescriptor::UNSPECIFIED
175         * @param ep The remote endpoint to get all links to.
176         * @return List of LinkID
177         */
178//      LinkIDs getLocalLinks(const address_v* addr) const;  // XXX aktuell
179
180        /**
181         * Registers a receiver.
182         *
183         * @param _receiver The receiving side
184         */
185        void registerMessageReceiver(MessageReceiver* receiver) {
186                messageReceiver = receiver;
187        }
188
189        /**
190         * Unregister a receiver.
191         *
192         * @param _receiver The receiving side
193         */
194        void unregisterMessageReceiver(MessageReceiver* receiver) {
195                messageReceiver = NULL;
196        }
197
198        void registerEventListener(CommunicationEvents* _events);
199
200        void unregisterEventListener(CommunicationEvents* _events);
201
202        /**
203         * called within the ASIO thread
204         * when a message is received from underlay transport
205         */ 
206        virtual void receive_message(transport_connection::sptr connection,
207                reboost::shared_buffer_t msg);
208
209    /**
210     * called within the ASIO thread
211     * when a connection is terminated (e.g. TCP close)
212     */ 
213    virtual void connection_terminated(transport_connection::sptr connection);
214
215    addressing2::EndpointPtr get_local_endpoint_of_link(const LinkID& linkid);
216    addressing2::EndpointPtr get_remote_endpoint_of_link(const LinkID& linkid);
217
218       
219protected:
220
221        /**
222         * called within the ARIBA thread (System Queue)
223         * when a message is received from underlay transport
224         */ 
225        void receiveMessage(transport_connection::sptr connection,
226                reboost::shared_buffer_t message);
227       
228    /**
229     * called within the ARIBA thread (System Queue)
230     * when a connection is terminated (e.g. TCP close)
231     */ 
232    void connectionTerminated(transport_connection::sptr connection);
233       
234
235        /// called when a network interface change happens
236        virtual void onNetworkChange(
237                const NetworkChangeInterface::NetworkChangeInfo& info);
238
239private:
240        /**
241         * A link descriptor consisting of the end-point descriptor and currently
242         * used underlay address.
243         */
244        class LinkDescriptor {
245        public:
246
247                /// default constructor
248                LinkDescriptor() :
249                        localLink(LinkID::UNSPECIFIED),
250                        remoteLink(LinkID::UNSPECIFIED),
251                        up(false) {
252                }
253
254                ~LinkDescriptor()
255                {
256                        if ( connection )
257                        {
258                            connection->unregister_communication_link(&localLink);
259                        }
260                }
261
262                bool isUnspecified() const {
263                        return (this == &UNSPECIFIED());
264                }
265
266                static LinkDescriptor& UNSPECIFIED(){
267                        static LinkDescriptor* unspec = NULL;
268                        if(unspec == NULL) unspec = new LinkDescriptor();
269                        return *unspec;
270                }
271               
272               
273                transport_connection::sptr get_connection() const
274                {
275                    return connection;
276                }
277               
278                void set_connection(const transport_connection::sptr& conn)
279                {
280                    // unregister from old connection,
281                    // if any (but normally there shouldn't..)
282                    if ( connection )
283                    {
284                        connection->unregister_communication_link(&localLink);
285                    }
286
287                    // * set_connection *
288                    connection = conn;
289                   
290                    // register this link with the connection
291                    conn->register_communication_link(&localLink);
292                }
293
294                bool unspecified;
295
296                /// link identifiers
297                LinkID localLink;
298                addressing2::EndpointPtr localLocator;
299
300                /// used underlay addresses for the link
301                LinkID remoteLink;
302                addressing2::EndpointPtr remoteLocator;
303
304                /// the remote end-point descriptor
305                EndpointDescriptor remoteDescriptor;
306
307                /// flag, whether this link is up
308                bool up;
309               
310               
311        private:
312                /// connection if link is up
313                transport_connection::sptr connection;
314        };
315
316        /// Link management: list of links
317        typedef vector<LinkDescriptor*> LinkSet;
318
319        /// Link management: the set of currently managed links
320        LinkSet linkSet;
321
322        /// Link management: add a link
323        void addLink( LinkDescriptor* link );
324
325        /// Link management: remove a link
326        void removeLink(const LinkID& localLink);
327
328        /// Link management: get link information using the local link
329        LinkDescriptor& queryLocalLink(const LinkID& localLink) const;
330
331        /// Link management: get link information using the remote link
332        LinkDescriptor& queryRemoteLink(const LinkID& remoteLink) const;
333
334        /// The local end-point descriptor
335        EndpointDescriptor localDescriptor;
336       
337        /**
338         * endpoint_set holding the addresses of the "server"-sockets,
339         * ---> that should be opened
340         *
341         * (e.g. 0.0.0.0:41322)
342         */
343        addressing2::EndpointSetPtr listenOn_endpoints;
344       
345    /**
346     * endpoint_set holding the addresses of the "server"-sockets,
347     * ---> that are actually open
348     *
349     * (e.g. 0.0.0.0:41322)
350     *
351     * XXX should only be in transport_peer
352     */
353        addressing2::EndpointSetPtr active_listenOn_endpoints;
354       
355    /**
356     * endpoint_set holding the addresses of the "server"-sockets,
357     * ---> here the discovered "addressable" addresses are stored
358     *
359     * (e.g. 192.168.0.5:41322)
360     *
361     * XXX should only be in localDescriptor
362     */
363        addressing2::EndpointSetPtr local_endpoints;
364
365        /// network change detector
366        NetworkChangeDetection networkMonitor;
367
368        /// list of all remote addresses of links to end-points
369        // XXX DEPRECATED
370//      class endpoint_reference {
371//      public:
372//              int count; ///< the number of open links to this end-point
373//              const address_v* endpoint; ///< the end-point itself
374//      };
375//      vector<endpoint_reference> remote_endpoints;
376
377        // XXX DEPRECATED
378//      /// adds an end-point to the list
379//      void add_endpoint( const address_v* endpoint );
380//
381//      /// removes an end-point from the list
382//      void remove_endpoint( const address_v* endpoint );
383
384        /// event listener
385        typedef set<CommunicationEvents*> EventListenerSet;
386        EventListenerSet eventListener;
387
388        /// sequence numbers
389        seqnum_t currentSeqnum;
390
391        /// transport peer
392        transport_peer* transport;
393
394        /// the base overlay message receiver
395        MessageReceiver* messageReceiver;
396
397       
398        /*
399         * Sends a message over an existing link.
400         *   ---> Adds »Link Message« Header
401         */
402        bool send_over_link(
403                const uint8_t type,
404                reboost::message_t message,
405                const LinkDescriptor& desc,
406                const uint8_t priority);
407
408    /*
409     * Sends a message to a known peer. (To all known endpoints.)
410     *   ---> Adds »Peer Message« Header
411     */
412        void send_to_peer(
413                const uint8_t type,
414                const PeerID& peer_id,
415                reboost::message_t message,
416                const EndpointDescriptor& endpoint,
417                const uint8_t priority );
418       
419
420        /// state of the base communication
421        bool started;
422
423};
424
425}} // namespace ariba, communication
426
427#endif /* BASECOMMUNICATION_H_ */
Note: See TracBrowser for help on using the repository browser.