source: source/ariba/utility/bootstrap/modules/periodicbroadcast/PeriodicBroadcast.h@ 12745

Last change on this file since 12745 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 10.8 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#ifndef __PERIODIC_BROADCAST_H
40#define __PERIODIC_BROADCAST_H
41
42// ariba
43#include "ariba/config.h"
44#include "ariba/utility/bootstrap/modules/BootstrapModule.h"
45#include "ariba/utility/logging/Logging.h"
46#include "ariba/utility/system/Timer.h"
47
48// ariba messages
49#include "PeriodicBroadcastMessage.h"
50
51// (ariba) link-local
52#include "ariba/utility/transport/StreamTransport/StreamTransport.hpp"
53
54// system
55#include <map>
56#include <string>
57#include <ctime>
58#include <iostream>
59
60// boost
61#include <boost/asio.hpp>
62#include <boost/foreach.hpp>
63#include <boost/thread/mutex.hpp>
64#include <boost/thread/thread.hpp>
65
66
67using std::map;
68using std::string;
69using boost::asio::ip::udp;
70
71namespace ariba {
72namespace utility {
73
74class PeriodicBroadcast : public BootstrapModule, public Timer {
75 use_logging_h(PeriodicBroadcast);
76public:
77 PeriodicBroadcast(BootstrapInformationCallback* _callback, string info);
78 virtual ~PeriodicBroadcast();
79
80 virtual void start();
81 virtual void stop();
82
83 virtual string getName();
84 virtual string getInformation();
85 virtual bool isFunctional();
86 virtual void publishService(string name, string info1, string info2, string info3);
87 virtual void revokeService(string name);
88
89protected:
90 virtual void eventFunction();
91
92private:
93 void sendLocalServices();
94 void updateRemoteServices();
95
96 static const long timerinterval; // used to send out updates on our services and check for new services
97 static const long servicetimeout; // timeout after that a service is dead when we did not receive updates
98 static const unsigned int serverport_v4;
99 static const unsigned int serverport_v6;
100
101 class Service {
102 private:
103 string name;
104 string info1;
105 string info2;
106 string info3;
107 time_t lastseen;
108
109 public:
110 Service()
111 : name(""), info1(""), info2(""), info3(""), lastseen(0){
112 }
113
114 Service(const string& _name, const string& _info1,
115 const string& _info2, const string& _info3, const time_t& _lastseen = 0){
116 name.assign (_name);
117 info1.assign(_info1);
118 info2.assign(_info2);
119 info3.assign(_info3);
120 lastseen = _lastseen;
121 }
122
123 Service(const Service& rh){
124 name.assign (rh.name);
125 info1.assign(rh.info1);
126 info2.assign(rh.info2);
127 info3.assign(rh.info3);
128 lastseen = rh.lastseen;
129 }
130
131 string getName() const {
132 return name;
133 }
134
135 string getInfo1() const {
136 return info1;
137 }
138
139 string getInfo2() const {
140 return info2;
141 }
142
143 string getInfo3() const {
144 return info3;
145 }
146
147 time_t getLastseen() const {
148 return lastseen;
149 }
150
151 void setName(string _name){
152 name.assign(_name);
153 }
154
155 void setInfo1(string _info1){
156 info1.assign(_info1);
157 }
158
159 void setInfo2(string _info2){
160 info2.assign(_info2);
161 }
162
163 void setInfo3(string _info3){
164 info3.assign(_info3);
165 }
166
167 void setLastseen(time_t _lastseen){
168 lastseen = _lastseen;
169 }
170
171 Service& operator=(const Service& rh){
172 this->name.assign( rh.getName() );
173 this->info1.assign( rh.getInfo1() );
174 this->info2.assign( rh.getInfo2() );
175 this->info3.assign( rh.getInfo3() );
176 this->lastseen = rh.lastseen;
177 return *this;
178 }
179 };
180
181 typedef map<string,Service> ServiceList;
182
183 ServiceList localServices;
184 boost::mutex localServicesMutex;
185
186 ServiceList remoteServices;
187 boost::mutex remoteServicesMutex;
188
189 ServiceList newRemoteServices;
190 boost::mutex newRemoteServicesMutex;
191
192 boost::asio::io_service io_service;
193 boost::thread* io_service_thread;
194 static void threadFunc(PeriodicBroadcast* obj);
195
196 class udp_server {
197 private:
198 udp::socket socket_v4;
199 udp::socket socket_v6;
200 udp::endpoint remote_endpoint_;
201 boost::array<char, 1500> recv_buffer_4;
202 boost::array<char, 1500> recv_buffer_6;
203 ServiceList* services;
204 boost::mutex* servicesmutex;
205
206 public:
207 udp_server(boost::asio::io_service& io_service, ServiceList* _services, boost::mutex* _servicesmutex)
208 : socket_v4(io_service), socket_v6(io_service),
209 services(_services), servicesmutex(_servicesmutex) {
210
211 if( open4() ) start_receive_4();
212 if( open6() ) start_receive_6();
213 }
214
215 bool open4(){
216 boost::system::error_code err;
217
218 boost::asio::ip::udp::endpoint listen_endpoint_v4(
219 boost::asio::ip::address_v4::any(),
220 PeriodicBroadcast::serverport_v4);
221
222 err = socket_v4.open( listen_endpoint_v4.protocol(), err );
223 if(err){
224 logging_warn("failed opening ipv4 socket");
225 return false;
226 }
227
228 err = socket_v4.set_option( boost::asio::ip::udp::socket::reuse_address(true), err );
229 if(err){
230 logging_warn("failed setting reuse address option on ipv4 socket");
231 return false;
232 }
233
234 err = socket_v4.set_option( boost::asio::socket_base::broadcast(true), err );
235 if(err){
236 logging_warn("failed setting broadcast option on ipv4 socket");
237 return false;
238 }
239
240 err = socket_v4.bind( listen_endpoint_v4, err );
241 if(err){
242 logging_warn("failed binding ipv4 socket");
243 return false;
244 }
245
246 return true;
247 }
248
249 bool open6(){
250 boost::system::error_code err;
251
252 boost::asio::ip::udp::endpoint listen_endpoint_v6(
253 boost::asio::ip::address_v6::any(),
254 PeriodicBroadcast::serverport_v6);
255
256 err = socket_v6.open( listen_endpoint_v6.protocol(), err );
257 if(err){
258 logging_warn("failed opening ipv6 socket");
259 return false;
260 }
261
262 err = socket_v6.set_option( boost::asio::ip::udp::socket::reuse_address(true), err );
263 if(err){
264 logging_warn("failed setting reuse address option on ipv6 socket");
265 return false;
266 }
267
268 err = socket_v6.set_option( boost::asio::socket_base::broadcast(true), err );
269 if(err){
270 logging_warn("failed setting broadcast option on ipv6 socket");
271 return false;
272 }
273
274 err = socket_v6.bind( listen_endpoint_v6, err );
275 if(err){
276 logging_warn("failed binding ipv6 socket");
277 return false;
278 }
279
280 return true;
281 }
282
283 void sendservice(Service service){
284
285 PeriodicBroadcastMessage msg;
286 if(service.getName().empty()) return;
287
288 msg.setName( service.getName() );
289 msg.setInfo1( service.getInfo1() );
290 msg.setInfo2( service.getInfo2() );
291 msg.setInfo3( service.getInfo3() );
292
293 Data data = data_serialize( msg, DEFAULT_V );
294 uint8_t* pnt = data.getBuffer();
295 size_t len = data.getLength() / 8;
296
297 boost::system::error_code err;
298
299 {
300 udp::endpoint endp(udp::v4(), PeriodicBroadcast::serverport_v4);
301 endp.address( boost::asio::ip::address_v4::broadcast() );
302 socket_v4.send_to( boost::asio::buffer(pnt, len), endp, 0, err );
303 if(err) logging_warn("failed sending message through ipv4 socket");
304 }
305 {
306 udp::endpoint endp(udp::v6(), PeriodicBroadcast::serverport_v6);
307 boost::asio::ip::address_v6 all_nodes = boost::asio::ip::address_v6::from_string("ff02::1");
308
309 // include all link-local interfaces
310 vector<uint64_t> scope_ids = ariba::transport::get_interface_scope_ids();
311
312 BOOST_FOREACH ( uint64_t id, scope_ids )
313 {
314 all_nodes.scope_id(id);
315 endp.address( all_nodes );
316
317 socket_v6.send_to( boost::asio::buffer(pnt, len), endp, 0, err );
318 if(err) logging_warn("failed sending message through ipv6 socket");
319 }
320 }
321 }
322
323 private:
324 void start_receive_4(){
325 socket_v4.async_receive_from(
326 boost::asio::buffer(recv_buffer_4), remote_endpoint_,
327 boost::bind(&udp_server::handle_receive_4, this,
328 boost::asio::placeholders::error,
329 boost::asio::placeholders::bytes_transferred));
330 }
331
332 void start_receive_6(){
333 socket_v6.async_receive_from(
334 boost::asio::buffer(recv_buffer_6), remote_endpoint_,
335 boost::bind(&udp_server::handle_receive_6, this,
336 boost::asio::placeholders::error,
337 boost::asio::placeholders::bytes_transferred));
338 }
339
340 void handle_receive_4(const boost::system::error_code& error,
341 std::size_t bytes_transferred){
342
343 if (!error || error == boost::asio::error::message_size)
344 handle_info(recv_buffer_4, bytes_transferred);
345 else
346 logging_warn("failed receiving broadcast data: " << error.message());
347
348 start_receive_4();
349 }
350
351 void handle_receive_6(const boost::system::error_code& error,
352 std::size_t bytes_transferred){
353
354 if (!error || error == boost::asio::error::message_size)
355 handle_info(recv_buffer_6, bytes_transferred);
356 else
357 logging_warn("failed receiving broadcast data: " << error.message());
358
359 start_receive_6();
360 }
361
362 void handle_info(boost::array<char, 1500>& buffer, std::size_t length){
363
364 try {
365
366 PeriodicBroadcastMessage msg;
367
368 Data data( (uint8_t*)buffer.data(), length*8 );
369 data_deserialize( msg, data );
370
371 { // insert new found service
372 boost::mutex::scoped_lock lock( *servicesmutex );
373 if(msg.getName().empty()) return;
374
375 ServiceList::iterator it = services->find( msg.getName() );
376 if( it != services->end() ){
377 it->second.setLastseen( time(NULL) );
378 } else {
379 Service s( msg.getName(), msg.getInfo1(), msg.getInfo2(), msg.getInfo3(), time(NULL));
380 services->insert( std::make_pair(msg.getName(), s) );
381 }
382 }
383
384 }catch(...){
385 /* ignore error */
386 }
387 }
388
389 void handle_send(boost::shared_ptr<std::string> /*message*/,
390 const boost::system::error_code& error,
391 std::size_t /*bytes_transferred*/){
392
393 if(error)
394 logging_warn("failed sending out message");
395 }
396 };
397
398 udp_server server;
399};
400
401}} //namespace ariba, utility
402
403#endif // __BLUETOOTH_SDP_H
Note: See TracBrowser for help on using the repository browser.