source: source/ariba/utility/bootstrap/modules/periodicbroadcast/PeriodicBroadcast.h@ 12745

Last change on this file since 12745 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 10.8 KB
RevLine 
[4850]1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#ifndef __PERIODIC_BROADCAST_H
40#define __PERIODIC_BROADCAST_H
41
[12060]42// ariba
[4850]43#include "ariba/config.h"
[12060]44#include "ariba/utility/bootstrap/modules/BootstrapModule.h"
45#include "ariba/utility/logging/Logging.h"
46#include "ariba/utility/system/Timer.h"
[4850]47
[12060]48// ariba messages
49#include "PeriodicBroadcastMessage.h"
50
51// (ariba) link-local
52#include "ariba/utility/transport/StreamTransport/StreamTransport.hpp"
53
54// system
[4851]55#include <map>
56#include <string>
[4866]57#include <ctime>
[4850]58#include <iostream>
[12060]59
60// boost
[4853]61#include <boost/asio.hpp>
[4851]62#include <boost/foreach.hpp>
[4850]63#include <boost/thread/mutex.hpp>
64#include <boost/thread/thread.hpp>
65
[10653]66
[4850]67using std::map;
68using std::string;
[4853]69using boost::asio::ip::udp;
[4850]70
71namespace ariba {
72namespace utility {
73
74class PeriodicBroadcast : public BootstrapModule, public Timer {
75 use_logging_h(PeriodicBroadcast);
76public:
[7532]77 PeriodicBroadcast(BootstrapInformationCallback* _callback, string info);
[4850]78 virtual ~PeriodicBroadcast();
79
80 virtual void start();
81 virtual void stop();
82
83 virtual string getName();
84 virtual string getInformation();
85 virtual bool isFunctional();
86 virtual void publishService(string name, string info1, string info2, string info3);
87 virtual void revokeService(string name);
88
89protected:
90 virtual void eventFunction();
91
92private:
93 void sendLocalServices();
94 void updateRemoteServices();
95
[4866]96 static const long timerinterval; // used to send out updates on our services and check for new services
97 static const long servicetimeout; // timeout after that a service is dead when we did not receive updates
[4853]98 static const unsigned int serverport_v4;
99 static const unsigned int serverport_v6;
[4850]100
[5531]101 class Service {
102 private:
[4850]103 string name;
104 string info1;
105 string info2;
106 string info3;
[4866]107 time_t lastseen;
108
[5531]109 public:
110 Service()
[4866]111 : name(""), info1(""), info2(""), info3(""), lastseen(0){
112 }
[5414]113
[5531]114 Service(const string& _name, const string& _info1,
[5516]115 const string& _info2, const string& _info3, const time_t& _lastseen = 0){
116 name.assign (_name);
117 info1.assign(_info1);
118 info2.assign(_info2);
119 info3.assign(_info3);
120 lastseen = _lastseen;
[5420]121 }
122
[5531]123 Service(const Service& rh){
[5516]124 name.assign (rh.name);
125 info1.assign(rh.info1);
126 info2.assign(rh.info2);
127 info3.assign(rh.info3);
128 lastseen = rh.lastseen;
[5414]129 }
[4850]130
[5531]131 string getName() const {
132 return name;
133 }
134
135 string getInfo1() const {
136 return info1;
137 }
138
139 string getInfo2() const {
140 return info2;
141 }
142
143 string getInfo3() const {
144 return info3;
145 }
146
147 time_t getLastseen() const {
148 return lastseen;
149 }
150
151 void setName(string _name){
152 name.assign(_name);
153 }
154
155 void setInfo1(string _info1){
156 info1.assign(_info1);
157 }
158
159 void setInfo2(string _info2){
160 info2.assign(_info2);
161 }
162
163 void setInfo3(string _info3){
164 info3.assign(_info3);
165 }
166
167 void setLastseen(time_t _lastseen){
168 lastseen = _lastseen;
169 }
170
171 Service& operator=(const Service& rh){
172 this->name.assign( rh.getName() );
173 this->info1.assign( rh.getInfo1() );
174 this->info2.assign( rh.getInfo2() );
175 this->info3.assign( rh.getInfo3() );
176 this->lastseen = rh.lastseen;
177 return *this;
178 }
179 };
180
[4850]181 typedef map<string,Service> ServiceList;
[5516]182
[4850]183 ServiceList localServices;
184 boost::mutex localServicesMutex;
185
[4853]186 ServiceList remoteServices;
187 boost::mutex remoteServicesMutex;
188
189 ServiceList newRemoteServices;
190 boost::mutex newRemoteServicesMutex;
191
192 boost::asio::io_service io_service;
[4924]193 boost::thread* io_service_thread;
194 static void threadFunc(PeriodicBroadcast* obj);
[4853]195
196 class udp_server {
197 private:
[4920]198 udp::socket socket_v4;
199 udp::socket socket_v6;
[4853]200 udp::endpoint remote_endpoint_;
[5464]201 boost::array<char, 1500> recv_buffer_4;
202 boost::array<char, 1500> recv_buffer_6;
[4853]203 ServiceList* services;
204 boost::mutex* servicesmutex;
205
206 public:
207 udp_server(boost::asio::io_service& io_service, ServiceList* _services, boost::mutex* _servicesmutex)
[6919]208 : socket_v4(io_service), socket_v6(io_service),
209 services(_services), servicesmutex(_servicesmutex) {
[4853]210
[5516]211 if( open4() ) start_receive_4();
212 if( open6() ) start_receive_6();
213 }
214
215 bool open4(){
216 boost::system::error_code err;
217
[4872]218 boost::asio::ip::udp::endpoint listen_endpoint_v4(
[4920]219 boost::asio::ip::address_v4::any(),
[4872]220 PeriodicBroadcast::serverport_v4);
[4853]221
[5516]222 err = socket_v4.open( listen_endpoint_v4.protocol(), err );
223 if(err){
224 logging_warn("failed opening ipv4 socket");
225 return false;
226 }
227
228 err = socket_v4.set_option( boost::asio::ip::udp::socket::reuse_address(true), err );
229 if(err){
230 logging_warn("failed setting reuse address option on ipv4 socket");
231 return false;
232 }
233
234 err = socket_v4.set_option( boost::asio::socket_base::broadcast(true), err );
235 if(err){
236 logging_warn("failed setting broadcast option on ipv4 socket");
237 return false;
238 }
239
240 err = socket_v4.bind( listen_endpoint_v4, err );
241 if(err){
242 logging_warn("failed binding ipv4 socket");
243 return false;
244 }
245
246 return true;
247 }
248
249 bool open6(){
250 boost::system::error_code err;
251
[4872]252 boost::asio::ip::udp::endpoint listen_endpoint_v6(
[4920]253 boost::asio::ip::address_v6::any(),
[4872]254 PeriodicBroadcast::serverport_v6);
255
[4920]256 err = socket_v6.open( listen_endpoint_v6.protocol(), err );
[5516]257 if(err){
258 logging_warn("failed opening ipv6 socket");
259 return false;
260 }
[4872]261
[4920]262 err = socket_v6.set_option( boost::asio::ip::udp::socket::reuse_address(true), err );
[5516]263 if(err){
264 logging_warn("failed setting reuse address option on ipv6 socket");
265 return false;
266 }
[4896]267
[4920]268 err = socket_v6.set_option( boost::asio::socket_base::broadcast(true), err );
[5516]269 if(err){
270 logging_warn("failed setting broadcast option on ipv6 socket");
271 return false;
272 }
[4896]273
[4921]274 err = socket_v6.bind( listen_endpoint_v6, err );
[5516]275 if(err){
276 logging_warn("failed binding ipv6 socket");
277 return false;
278 }
[4920]279
[5516]280 return true;
[4853]281 }
282
283 void sendservice(Service service){
284
[5479]285 PeriodicBroadcastMessage msg;
[7532]286 if(service.getName().empty()) return;
[5479]287
[5531]288 msg.setName( service.getName() );
289 msg.setInfo1( service.getInfo1() );
290 msg.setInfo2( service.getInfo2() );
291 msg.setInfo3( service.getInfo3() );
[5479]292
[4853]293 Data data = data_serialize( msg, DEFAULT_V );
294 uint8_t* pnt = data.getBuffer();
[4866]295 size_t len = data.getLength() / 8;
[4853]296
[4896]297 boost::system::error_code err;
[4866]298
[4853]299 {
300 udp::endpoint endp(udp::v4(), PeriodicBroadcast::serverport_v4);
301 endp.address( boost::asio::ip::address_v4::broadcast() );
[4920]302 socket_v4.send_to( boost::asio::buffer(pnt, len), endp, 0, err );
[4896]303 if(err) logging_warn("failed sending message through ipv4 socket");
[4853]304 }
[4920]305 {
[4853]306 udp::endpoint endp(udp::v6(), PeriodicBroadcast::serverport_v6);
[10653]307 boost::asio::ip::address_v6 all_nodes = boost::asio::ip::address_v6::from_string("ff02::1");
308
309 // include all link-local interfaces
[12060]310 vector<uint64_t> scope_ids = ariba::transport::get_interface_scope_ids();
[10653]311
312 BOOST_FOREACH ( uint64_t id, scope_ids )
313 {
314 all_nodes.scope_id(id);
315 endp.address( all_nodes );
316
317 socket_v6.send_to( boost::asio::buffer(pnt, len), endp, 0, err );
318 if(err) logging_warn("failed sending message through ipv6 socket");
319 }
[4920]320 }
[4853]321 }
322
323 private:
[5465]324 void start_receive_4(){
[4920]325 socket_v4.async_receive_from(
[5464]326 boost::asio::buffer(recv_buffer_4), remote_endpoint_,
327 boost::bind(&udp_server::handle_receive_4, this,
[4853]328 boost::asio::placeholders::error,
329 boost::asio::placeholders::bytes_transferred));
[5465]330 }
[4920]331
[5465]332 void start_receive_6(){
[4853]333 socket_v6.async_receive_from(
[5464]334 boost::asio::buffer(recv_buffer_6), remote_endpoint_,
335 boost::bind(&udp_server::handle_receive_6, this,
[4853]336 boost::asio::placeholders::error,
337 boost::asio::placeholders::bytes_transferred));
338 }
339
[5464]340 void handle_receive_4(const boost::system::error_code& error,
[4853]341 std::size_t bytes_transferred){
342
[5464]343 if (!error || error == boost::asio::error::message_size)
344 handle_info(recv_buffer_4, bytes_transferred);
345 else
346 logging_warn("failed receiving broadcast data: " << error.message());
[4853]347
[5465]348 start_receive_4();
[5464]349 }
[4853]350
[5464]351 void handle_receive_6(const boost::system::error_code& error,
352 std::size_t bytes_transferred){
[4853]353
[5464]354 if (!error || error == boost::asio::error::message_size)
355 handle_info(recv_buffer_6, bytes_transferred);
356 else
357 logging_warn("failed receiving broadcast data: " << error.message());
[4853]358
[5465]359 start_receive_6();
[5464]360 }
[5421]361
[5464]362 void handle_info(boost::array<char, 1500>& buffer, std::size_t length){
[4853]363
[5973]364 try {
[4896]365
[5973]366 PeriodicBroadcastMessage msg;
[4896]367
[5973]368 Data data( (uint8_t*)buffer.data(), length*8 );
369 data_deserialize( msg, data );
[5464]370
[5973]371 { // insert new found service
[6919]372 boost::mutex::scoped_lock lock( *servicesmutex );
[7532]373 if(msg.getName().empty()) return;
[5464]374
[5973]375 ServiceList::iterator it = services->find( msg.getName() );
376 if( it != services->end() ){
377 it->second.setLastseen( time(NULL) );
378 } else {
379 Service s( msg.getName(), msg.getInfo1(), msg.getInfo2(), msg.getInfo3(), time(NULL));
380 services->insert( std::make_pair(msg.getName(), s) );
381 }
[5464]382 }
[5973]383
384 }catch(...){
385 /* ignore error */
[4853]386 }
387 }
388
389 void handle_send(boost::shared_ptr<std::string> /*message*/,
[4896]390 const boost::system::error_code& error,
[4853]391 std::size_t /*bytes_transferred*/){
[4920]392
[4896]393 if(error)
394 logging_warn("failed sending out message");
[4853]395 }
396 };
397
398 udp_server server;
[4850]399};
400
401}} //namespace ariba, utility
402
403#endif // __BLUETOOTH_SDP_H
Note: See TracBrowser for help on using the repository browser.