source: source/ariba/communication/BaseCommunication.h@ 12745

Last change on this file since 12745 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 11.7 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#ifndef BASECOMMUNICATION_H_
40#define BASECOMMUNICATION_H_
41
42// boost & std includes
43#include <boost/unordered_map.hpp>
44#include <boost/unordered_set.hpp>
45#include <map>
46#include <set>
47#include <vector>
48#include <iostream>
49#include <algorithm>
50#include <boost/foreach.hpp>
51
52#ifdef ECLIPSE_PARSER
53 #define foreach(a, b) for(a : b)
54#else
55 #define foreach(a, b) BOOST_FOREACH(a, b)
56#endif
57
58// utilities
59#include "ariba/utility/types.h"
60#include "ariba/utility/messages/MessageReceiver.h"
61#include "ariba/utility/logging/Logging.h"
62#include "ariba/utility/misc/Demultiplexer.hpp"
63#include "ariba/utility/system/SystemEventListener.h"
64
65// new transport and addressing
66#include "ariba/utility/transport/transport_peer.hpp"
67#include "ariba/utility/transport/interfaces/transport_connection.hpp"
68#include "ariba/utility/transport/interfaces/transport_listener.hpp"
69#include "ariba/utility/addressing2/endpoint.hpp"
70
71// communication
72#include "ariba/communication/CommunicationEvents.h"
73#include "ariba/communication/EndpointDescriptor.h"
74#include "ariba/communication/messages/AribaBaseMsg.h"
75
76// network changes
77#include "ariba/communication/networkinfo/NetworkChangeInterface.h"
78#include "ariba/communication/networkinfo/NetworkChangeDetection.h"
79#include "ariba/communication/networkinfo/NetworkInformation.h"
80
81namespace ariba {
82 class SideportListener;
83}
84
85namespace ariba {
86namespace communication {
87
88
89class communication_message_not_sent: public std::runtime_error
90{
91public:
92 /** Takes a character string describing the error. */
93 explicit communication_message_not_sent(const string& __arg) :
94 std::runtime_error(__arg)
95 {
96 }
97
98 virtual ~communication_message_not_sent() throw() {}
99};
100
101
102
103using namespace std;
104using namespace ariba::transport;
105using namespace ariba::utility;
106
107// use base ariba types (clarifies multiple definitions)
108using ariba::utility::Message;
109using ariba::utility::seqnum_t;
110
111/**
112 * This class implements the Ariba Base Communication<br />
113 *
114 * Its primary task is to provide an abstraction to existing
115 * protocols and addressing schemes.
116 *
117 * @author Sebastian Mies, Christoph Mayer, Mario Hock
118 */
119class BaseCommunication:
120 public NetworkChangeInterface,
121 public transport_listener {
122
123 use_logging_h(BaseCommunication);
124 friend class ariba::SideportListener;
125
126public:
127 /// Default ctor that just creates an non-functional base communication
128 BaseCommunication();
129
130 /// Default dtor that does nothing
131 virtual ~BaseCommunication();
132
133 /// Startup the base communication, start modules etc.
134 void start(addressing2::EndpointSetPtr listen_on);
135
136 /// Stops the base communication, stop modules etc.
137 void stop();
138
139 /// Check whether the base communication has been started up
140 bool isStarted();
141
142 /// Establishes a link to another end-point.
143 const LinkID establishLink(const EndpointDescriptor& descriptor,
144 const LinkID& linkid = LinkID::UNSPECIFIED, const QoSParameterSet& qos =
145 QoSParameterSet::DEFAULT, const SecurityParameterSet& sec =
146 SecurityParameterSet::DEFAULT);
147
148 /// Drops a link
149 void dropLink(const LinkID link);
150
151 /**
152 * Sends a message though an existing link to an end-point.
153 *
154 * @param lid The link identifier
155 * @param message The message to be sent
156 * @return A sequence number for this message
157 */
158 seqnum_t sendMessage(const LinkID& lid,
159 reboost::message_t message,
160 uint8_t priority,
161 bool bypass_overlay = false) throw(communication_message_not_sent);
162
163 /**
164 * Returns the end-point descriptor
165 *
166 * @param link the link id of the requested end-point
167 * @return The end-point descriptor of the link's end-point
168 */
169 const EndpointDescriptor& getEndpointDescriptor(const LinkID link =
170 LinkID::UNSPECIFIED) const;
171
172 /**
173 * Get local links to the given endpoint of all local link
174 * using the default parameter EndpointDescriptor::UNSPECIFIED
175 * @param ep The remote endpoint to get all links to.
176 * @return List of LinkID
177 */
178// LinkIDs getLocalLinks(const address_v* addr) const; // XXX aktuell
179
180 /**
181 * Registers a receiver.
182 *
183 * @param _receiver The receiving side
184 */
185 void registerMessageReceiver(MessageReceiver* receiver) {
186 messageReceiver = receiver;
187 }
188
189 /**
190 * Unregister a receiver.
191 *
192 * @param _receiver The receiving side
193 */
194 void unregisterMessageReceiver(MessageReceiver* receiver) {
195 messageReceiver = NULL;
196 }
197
198 void registerEventListener(CommunicationEvents* _events);
199
200 void unregisterEventListener(CommunicationEvents* _events);
201
202 /**
203 * called within the ASIO thread
204 * when a message is received from underlay transport
205 */
206 virtual void receive_message(transport_connection::sptr connection,
207 reboost::shared_buffer_t msg);
208
209 /**
210 * called within the ASIO thread
211 * when a connection is terminated (e.g. TCP close)
212 */
213 virtual void connection_terminated(transport_connection::sptr connection);
214
215 addressing2::EndpointPtr get_local_endpoint_of_link(const LinkID& linkid);
216 addressing2::EndpointPtr get_remote_endpoint_of_link(const LinkID& linkid);
217
218
219protected:
220
221 /**
222 * called within the ARIBA thread (System Queue)
223 * when a message is received from underlay transport
224 */
225 void receiveMessage(transport_connection::sptr connection,
226 reboost::shared_buffer_t message);
227
228 /**
229 * called within the ARIBA thread (System Queue)
230 * when a connection is terminated (e.g. TCP close)
231 */
232 void connectionTerminated(transport_connection::sptr connection);
233
234
235 /// called when a network interface change happens
236 virtual void onNetworkChange(
237 const NetworkChangeInterface::NetworkChangeInfo& info);
238
239private:
240 /**
241 * A link descriptor consisting of the end-point descriptor and currently
242 * used underlay address.
243 */
244 class LinkDescriptor {
245 public:
246
247 /// default constructor
248 LinkDescriptor() :
249 localLink(LinkID::UNSPECIFIED),
250 remoteLink(LinkID::UNSPECIFIED),
251 up(false) {
252 }
253
254 ~LinkDescriptor()
255 {
256 if ( connection )
257 {
258 connection->unregister_communication_link(&localLink);
259 }
260 }
261
262 bool isUnspecified() const {
263 return (this == &UNSPECIFIED());
264 }
265
266 static LinkDescriptor& UNSPECIFIED(){
267 static LinkDescriptor* unspec = NULL;
268 if(unspec == NULL) unspec = new LinkDescriptor();
269 return *unspec;
270 }
271
272
273 transport_connection::sptr get_connection() const
274 {
275 return connection;
276 }
277
278 void set_connection(const transport_connection::sptr& conn)
279 {
280 // unregister from old connection,
281 // if any (but normally there shouldn't..)
282 if ( connection )
283 {
284 connection->unregister_communication_link(&localLink);
285 }
286
287 // * set_connection *
288 connection = conn;
289
290 // register this link with the connection
291 conn->register_communication_link(&localLink);
292 }
293
294 bool unspecified;
295
296 /// link identifiers
297 LinkID localLink;
298 addressing2::EndpointPtr localLocator;
299
300 /// used underlay addresses for the link
301 LinkID remoteLink;
302 addressing2::EndpointPtr remoteLocator;
303
304 /// the remote end-point descriptor
305 EndpointDescriptor remoteDescriptor;
306
307 /// flag, whether this link is up
308 bool up;
309
310
311 private:
312 /// connection if link is up
313 transport_connection::sptr connection;
314 };
315
316 /// Link management: list of links
317 typedef vector<LinkDescriptor*> LinkSet;
318
319 /// Link management: the set of currently managed links
320 LinkSet linkSet;
321
322 /// Link management: add a link
323 void addLink( LinkDescriptor* link );
324
325 /// Link management: remove a link
326 void removeLink(const LinkID& localLink);
327
328 /// Link management: get link information using the local link
329 LinkDescriptor& queryLocalLink(const LinkID& localLink) const;
330
331 /// Link management: get link information using the remote link
332 LinkDescriptor& queryRemoteLink(const LinkID& remoteLink) const;
333
334 /// The local end-point descriptor
335 EndpointDescriptor localDescriptor;
336
337 /**
338 * endpoint_set holding the addresses of the "server"-sockets,
339 * ---> that should be opened
340 *
341 * (e.g. 0.0.0.0:41322)
342 */
343 addressing2::EndpointSetPtr listenOn_endpoints;
344
345 /**
346 * endpoint_set holding the addresses of the "server"-sockets,
347 * ---> that are actually open
348 *
349 * (e.g. 0.0.0.0:41322)
350 *
351 * XXX should only be in transport_peer
352 */
353 addressing2::EndpointSetPtr active_listenOn_endpoints;
354
355 /**
356 * endpoint_set holding the addresses of the "server"-sockets,
357 * ---> here the discovered "addressable" addresses are stored
358 *
359 * (e.g. 192.168.0.5:41322)
360 *
361 * XXX should only be in localDescriptor
362 */
363 addressing2::EndpointSetPtr local_endpoints;
364
365 /// network change detector
366 NetworkChangeDetection networkMonitor;
367
368 /// list of all remote addresses of links to end-points
369 // XXX DEPRECATED
370// class endpoint_reference {
371// public:
372// int count; ///< the number of open links to this end-point
373// const address_v* endpoint; ///< the end-point itself
374// };
375// vector<endpoint_reference> remote_endpoints;
376
377 // XXX DEPRECATED
378// /// adds an end-point to the list
379// void add_endpoint( const address_v* endpoint );
380//
381// /// removes an end-point from the list
382// void remove_endpoint( const address_v* endpoint );
383
384 /// event listener
385 typedef set<CommunicationEvents*> EventListenerSet;
386 EventListenerSet eventListener;
387
388 /// sequence numbers
389 seqnum_t currentSeqnum;
390
391 /// transport peer
392 transport_peer* transport;
393
394 /// the base overlay message receiver
395 MessageReceiver* messageReceiver;
396
397
398 /*
399 * Sends a message over an existing link.
400 * ---> Adds »Link Message« Header
401 */
402 bool send_over_link(
403 const uint8_t type,
404 reboost::message_t message,
405 const LinkDescriptor& desc,
406 const uint8_t priority);
407
408 /*
409 * Sends a message to a known peer. (To all known endpoints.)
410 * ---> Adds »Peer Message« Header
411 */
412 void send_to_peer(
413 const uint8_t type,
414 const PeerID& peer_id,
415 reboost::message_t message,
416 const EndpointDescriptor& endpoint,
417 const uint8_t priority );
418
419
420 /// state of the base communication
421 bool started;
422
423};
424
425}} // namespace ariba, communication
426
427#endif /* BASECOMMUNICATION_H_ */
Note: See TracBrowser for help on using the repository browser.