An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/bootstrap/modules/periodicbroadcast/PeriodicBroadcast.h @ 5464

Last change on this file since 5464 was 5464, checked in by Christoph Mayer, 14 years ago

-periodic broadcast fix

File size: 8.9 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#ifndef __PERIODIC_BROADCAST_H
40#define __PERIODIC_BROADCAST_H
41
42#include "ariba/config.h"
43
44#include <map>
45#include <string>
46#include <ctime>
47#include <iostream>
48#include <boost/asio.hpp>
49#include <boost/foreach.hpp>
50#include <boost/thread/mutex.hpp>
51#include <boost/thread/thread.hpp>
52#include "ariba/utility/bootstrap/modules/BootstrapModule.h"
53#include "ariba/utility/logging/Logging.h"
54#include "ariba/utility/system/Timer.h"
55#include "PeriodicBroadcastMessage.h"
56
57using std::map;
58using std::string;
59using std::cout;
60using boost::asio::ip::udp;
61
62namespace ariba {
63namespace utility {
64
65class PeriodicBroadcast : public BootstrapModule, public Timer {
66        use_logging_h(PeriodicBroadcast);
67public:
68        PeriodicBroadcast(BootstrapInformationCallback* _callback);
69        virtual ~PeriodicBroadcast();
70
71        virtual void start();
72        virtual void stop();
73
74        virtual string getName();
75        virtual string getInformation();
76        virtual bool isFunctional();
77        virtual void publishService(string name, string info1, string info2, string info3);
78        virtual void revokeService(string name);
79
80protected:
81        virtual void eventFunction();
82
83private:
84        void sendLocalServices();
85        void updateRemoteServices();
86
87        static const long timerinterval; // used to send out updates on our services and check for new services
88        static const long servicetimeout; // timeout after that a service is dead when we did not receive updates
89        static const unsigned int serverport_v4;
90        static const unsigned int serverport_v6;
91
92        typedef struct _Service {
93                string name;
94                string info1;
95                string info2;
96                string info3;
97                time_t lastseen;
98
99                _Service()
100                        : name(""), info1(""), info2(""), info3(""), lastseen(0){
101                }
102
103                _Service(const string& _name, const string& _info1,
104                                const string& _info2, const string& _info3, const time_t& _lastseen = 0)
105                        : name(_name), info1(_info1), info2(_info2), info3(_info3), lastseen(_lastseen){
106                }
107
108                _Service(const _Service& rh)
109                        : name(rh.name), info1(rh.info1), info2(rh.info2),
110                                                info3(rh.info3), lastseen(rh.lastseen){
111                }
112        } Service;
113
114        typedef map<string,Service> ServiceList;
115        ServiceList localServices;
116        boost::mutex localServicesMutex;
117
118        ServiceList remoteServices;
119        boost::mutex remoteServicesMutex;
120
121        ServiceList newRemoteServices;
122        boost::mutex newRemoteServicesMutex;
123
124        boost::asio::io_service io_service;
125        boost::thread* io_service_thread;
126        static void threadFunc(PeriodicBroadcast* obj);
127
128        class udp_server {
129        private:
130                udp::socket socket_v4;
131                udp::socket socket_v6;
132                udp::endpoint remote_endpoint_;
133                boost::array<char, 1500> recv_buffer_4;
134                boost::array<char, 1500> recv_buffer_6;
135                ServiceList* services;
136                boost::mutex* servicesmutex;
137
138        public:
139                udp_server(boost::asio::io_service& io_service, ServiceList* _services, boost::mutex* _servicesmutex)
140                        : services(_services), servicesmutex(_servicesmutex),
141                                socket_v4(io_service), socket_v6(io_service) {
142
143                        boost::asio::ip::udp::endpoint listen_endpoint_v4(
144                                        boost::asio::ip::address_v4::any(),
145                                        PeriodicBroadcast::serverport_v4);
146
147                        boost::asio::ip::udp::endpoint listen_endpoint_v6(
148                                        boost::asio::ip::address_v6::any(),
149                                        PeriodicBroadcast::serverport_v6);
150
151                        boost::system::error_code err;
152
153                        err = socket_v4.open( listen_endpoint_v4.protocol(), err );
154                        if(err) logging_warn("failed opening ipv4 socket");
155
156                        err = socket_v6.open( listen_endpoint_v6.protocol(), err );
157                        if(err) logging_warn("failed opening ipv6 socket");
158
159                        err = socket_v4.set_option( boost::asio::ip::udp::socket::reuse_address(true), err );
160                        if(err) logging_warn("failed setting reuse address option on ipv4 socket");
161
162                        err = socket_v6.set_option( boost::asio::ip::udp::socket::reuse_address(true), err );
163                        if(err) logging_warn("failed setting reuse address option on ipv6 socket");
164
165                        err = socket_v4.set_option( boost::asio::socket_base::broadcast(true), err );
166                        if(err) logging_warn("failed setting broadcast option on ipv4 socket");
167
168                        err = socket_v6.set_option( boost::asio::socket_base::broadcast(true), err );
169                        if(err) logging_warn("failed setting broadcast option on ipv6 socket");
170
171                        err = socket_v4.bind( listen_endpoint_v4, err );
172                        if(err) logging_warn("failed binding ipv4 socket");
173
174                        err = socket_v6.bind( listen_endpoint_v6, err );
175                        if(err) logging_warn("failed binding ipv6 socket");
176
177                        start_receive();
178                }
179
180                void sendservice(Service service){
181
182                        PeriodicBroadcastMessage msg( service.name, service.info1, service.info2, service.info3 );
183                        Data data = data_serialize( msg, DEFAULT_V );
184                        uint8_t* pnt = data.getBuffer();
185                        size_t len = data.getLength() / 8;
186
187                        boost::system::error_code err;
188
189                        {
190                                udp::endpoint endp(udp::v4(), PeriodicBroadcast::serverport_v4);
191                                endp.address( boost::asio::ip::address_v4::broadcast() );
192                                socket_v4.send_to( boost::asio::buffer(pnt, len), endp, 0, err );
193                                if(err) logging_warn("failed sending message through ipv4 socket");
194                        }
195                        {
196                                udp::endpoint endp(udp::v6(), PeriodicBroadcast::serverport_v6);
197                                endp.address( boost::asio::ip::address_v6::from_string("ff02::1") );
198                                socket_v6.send_to( boost::asio::buffer(pnt, len), endp, 0, err );
199                                if(err) logging_warn("failed sending message through ipv6 socket");
200                        }
201                }
202
203        private:
204                void start_receive(){
205                        socket_v4.async_receive_from(
206                                        boost::asio::buffer(recv_buffer_4), remote_endpoint_,
207                                        boost::bind(&udp_server::handle_receive_4, this,
208                                                        boost::asio::placeholders::error,
209                                                        boost::asio::placeholders::bytes_transferred));
210
211                        socket_v6.async_receive_from(
212                                        boost::asio::buffer(recv_buffer_6), remote_endpoint_,
213                                        boost::bind(&udp_server::handle_receive_6, this,
214                                                        boost::asio::placeholders::error,
215                                                        boost::asio::placeholders::bytes_transferred));
216                }
217
218                void handle_receive_4(const boost::system::error_code& error,
219                                std::size_t bytes_transferred){
220
221                        if (!error || error == boost::asio::error::message_size)
222                                handle_info(recv_buffer_4, bytes_transferred);
223                        else
224                                logging_warn("failed receiving broadcast data: " << error.message());
225
226                        start_receive();
227                }
228
229                void handle_receive_6(const boost::system::error_code& error,
230                                std::size_t bytes_transferred){
231
232                        if (!error || error == boost::asio::error::message_size)
233                                handle_info(recv_buffer_6, bytes_transferred);
234                        else
235                                logging_warn("failed receiving broadcast data: " << error.message());
236
237                        start_receive();
238                }
239
240                void handle_info(boost::array<char, 1500>& buffer, std::size_t length){
241                        PeriodicBroadcastMessage msg;
242
243                        Data data( (uint8_t*)buffer.data(), length*8 );
244                        data_deserialize( msg, data );
245
246                        { // insert new found service
247                                boost::mutex::scoped_lock( *servicesmutex );
248
249                                ServiceList::iterator it = services->find( msg.getName() );
250                                if( it != services->end() ){
251
252                                        it->second.info1 = msg.getInfo1();
253                                        it->second.info2 = msg.getInfo2();
254                                        it->second.info3 = msg.getInfo3();
255                                        it->second.lastseen = time(NULL);
256
257                                } else {
258                                        Service s( msg.getName(), msg.getInfo1(), msg.getInfo2(), msg.getInfo3(), time(NULL));
259                                        services->insert( std::make_pair(msg.getName(), s) );
260                                }
261                        }
262                }
263
264                void handle_send(boost::shared_ptr<std::string> /*message*/,
265                                const boost::system::error_code& error,
266                                std::size_t /*bytes_transferred*/){
267
268                        if(error)
269                                logging_warn("failed sending out message");
270
271                }
272
273        };
274
275        udp_server server;
276
277};
278
279}} //namespace ariba, utility
280
281#endif // __BLUETOOTH_SDP_H
Note: See TracBrowser for help on using the repository browser.