source: source/ariba/utility/addressing2/endpoint_set.cpp@ 12458

Last change on this file since 12458 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 8.5 KB
Line 
1/*
2 * endpointset.cpp
3 *
4 * Created on: 27.03.2013
5 * Author: mario
6 */
7
8#include "endpoint_set.hpp"
9
10// ariba endpoints
11#include "tcpip_endpoint.hpp"
12
13// boost
14#include <boost/foreach.hpp>
15#include <boost/property_tree/json_parser.hpp>
16
17#include <limits>
18
19namespace ariba {
20namespace addressing2 {
21
22using namespace std;
23using boost::property_tree::ptree;
24
25// TODO maybe this function wants to go in a distinct header..
26template <class T>
27T read_from_byte_array(const uint8_t*& buff, int& read_max)
28{
29 assert ( sizeof(T) <= read_max );
30
31 const uint8_t* b1 = buff;
32 buff += sizeof(T);
33 read_max -= sizeof(T);
34
35 return *( reinterpret_cast<const T *>(b1) );
36}
37
38
39/* factories */
40//shared_ptr<endpoint_set> endpoint_set::create_EndpointSet(const std::string & str)
41//{
42// EndpointSetPtr set(new endpoint_set(str));
43//
44// return set;
45//}
46
47shared_ptr<endpoint_set> endpoint_set::create_EndpointSet(const ptree& pt)
48{
49 EndpointSetPtr set(new endpoint_set(pt));
50
51 return set;
52}
53
54shared_ptr<endpoint_set> endpoint_set::create_EndpointSet()
55{
56 EndpointSetPtr set(new endpoint_set());
57
58 return set;
59}
60
61
62
63endpoint_set::endpoint_set()
64{
65}
66
67endpoint_set::endpoint_set(const ptree& pt)
68{
69 /* create & add endpoints */
70 BOOST_FOREACH( const ptree::value_type& child, pt )
71 {
72 string cat = child.second.get<string>("category");
73
74 // TCPIP
75 if ( cat == "TCPIP" )
76 {
77 string addr = child.second.get<string>("addr");
78 int port = child.second.get<int>("port");
79
80 TcpIP_EndpointPtr endp(new tcpip_endpoint(addr, port));
81
82 tcpip_endpoints.push_back(endp);
83 }
84
85 // TODO else if bluetooth
86 }
87}
88
89//endpoint_set::endpoint_set(const string& str)
90//{
91// // TODO see endpoint_set(const ptree& pt)
92//
93//
94// ptree pt;
95//
96// /* parse input string */
97// // input string format is: JSON
98// if (str.substr(0, 4) == "JSON")
99// {
100// istringstream sstream(str.substr(4, string::npos));
101// boost::property_tree::json_parser::read_json(sstream, pt);
102// }
103// // --- other formats can be supported here (e.g. XML) ---
104// else
105// {
106// throw invalid_argument("Could not parse endpoint_set from string.");
107// }
108//
109//
110//
111// /* create & add endpoints */
112// BOOST_FOREACH(const ptree::value_type& child, pt.get_child("endpoint_set"))
113// {
114// string cat = child.second.get<string>("category");
115//
116// // TCPIP
117// if ( cat == "TCPIP" )
118// {
119// string addr = child.second.get<string>("addr");
120// int port = child.second.get<int>("port");
121//
122// TcpIP_EndpointPtr endp(new tcpip_endpoint(addr, port));
123//
124// tcpip_endpoints.push_back(endp);
125// }
126//
127// // TODO else if bluetooth
128// }
129//}
130
131endpoint_set::~endpoint_set()
132{
133}
134
135void endpoint_set::add_endpoint(EndpointPtr endpoint)
136{
137 switch ( endpoint->get_category() )
138 {
139 case endpoint_category::TCPIP:
140 {
141 // TODO try-catch --> log a warning
142 shared_ptr<tcpip_endpoint> tcpip_endp =
143 boost::dynamic_pointer_cast<tcpip_endpoint>(endpoint);
144
145 // no duplicates
146 bool duplicate = false;
147 BOOST_FOREACH(const shared_ptr<tcpip_endpoint>& x, tcpip_endpoints)
148 {
149 if ( tcpip_endp->equals(x) )
150 {
151 duplicate = true;
152 break;
153 }
154 }
155
156 if ( ! duplicate )
157 {
158 // * add *
159 tcpip_endpoints.push_back(tcpip_endp);
160 }
161
162 break;
163 }
164
165 default:
166 {
167 // TODO log a warning ^^
168
169 break;
170 }
171 }
172}
173
174void endpoint_set::add_endpoints(const shared_ptr<endpoint_set> endpoints)
175{
176 // TODO bluetooth, etc....
177
178 BOOST_FOREACH( EndpointPtr endp, endpoints->get_tcpip_endpoints() )
179 {
180 add_endpoint(endp);
181 }
182}
183
184
185const vector<shared_ptr<tcpip_endpoint> >& endpoint_set::get_tcpip_endpoints() const
186{
187 return tcpip_endpoints;
188}
189
190//const vector<shared_ptr<const tcpip_endpoint> > endpoint_set::get_tcpip_endpoints() const
191//{
192// vector<shared_ptr<const tcpip_endpoint> > ret;
193// ret.reserve(tcpip_endpoints.size());
194//
195// BOOST_FOREACH( const_TcpIP_EndpointPtr address, tcpip_endpoints )
196// {
197// ret.push_back(address);
198// }
199//
200// return ret;
201//}
202
203
204string endpoint_set::to_string() const
205{
206 ostringstream out;
207
208 BOOST_FOREACH( EndpointPtr endp, tcpip_endpoints )
209 {
210 out << endp->to_string() << "; ";
211 }
212
213 return out.str();
214}
215
216
217/**
218 * Format:
219 *
220 * | 16-bit: overall size | + ( | 8 bit: type | + | variable length: endpoint | ) * N
221 *
222 */
223reboost::shared_buffer_t endpoint_set::serialize() const
224{
225 // TODO bluetooth, etc....
226
227 /* calculate size */
228 // size: two byte length field
229 size_t overall_size = sizeof(uint16_t);
230
231 BOOST_FOREACH( EndpointPtr endp, tcpip_endpoints )
232 {
233 // size: endpoint + type
234 overall_size += endp->size() + 1;
235 }
236
237 // overall_size value must fit into 16 bits
238 assert ( overall_size <= numeric_limits<uint16_t>::max() );
239
240
241 /* serialize */
242 reboost::shared_buffer_t buff(overall_size);
243 uint8_t* buff_ptr = buff.mutable_data();
244
245 // overall size
246 memcpy( buff_ptr, &overall_size, sizeof(uint16_t) );
247 buff_ptr += sizeof(uint16_t);
248
249 // tcpip_endpoints
250 BOOST_FOREACH( EndpointPtr endp, tcpip_endpoints )
251 {
252// // XXX AKTUELL BUG FINDING...
253// cout << " - SERIALIZE: (" << (int) (buff_ptr - buff.mutable_data()) << ")";
254
255 // type
256 *buff_ptr = static_cast<uint8_t>(endp->get_type());
257 buff_ptr++;
258
259 // serialize tcpip_endpoint
260 buff_ptr += endp->to_byte_array(buff_ptr);
261
262// // XXX AKTUELL BUG FINDING...
263// cout << endp->to_string() << " (" << (int) (buff_ptr - buff.mutable_data()) << ")" << endl;
264 }
265
266 // boundary check
267 assert( buff_ptr <= buff.mutable_data() + buff.size() );
268
269 return buff;
270}
271
272reboost::shared_buffer_t endpoint_set::deserialize(reboost::shared_buffer_t buff)
273{
274 assert( tcpip_endpoints.size() == 0);
275
276 const uint8_t* buff_ptr = buff.data(); // NOTE: the data is const, the pointer is not.
277
278 // read overall size (16 bit value)
279 int bytes_left = sizeof(uint16_t);
280 uint16_t overall_size = read_from_byte_array<uint16_t>(buff_ptr, bytes_left);
281
282 // check claimed overall size
283 if ( overall_size > buff.size() )
284 {
285 // todo throw
286 cout << endl << "FATAL ERROR in »endpoint_set::deserialize«: overall_size > buff.size()" << endl;
287 assert ( false );
288 }
289
290 // calculate bytes to read
291 bytes_left = overall_size - sizeof(uint16_t);
292
293
294 // read endpoints
295 while ( bytes_left > 0 )
296 {
297// // XXX AKTUELL BUG FINDING...
298// cout << " - DESERIALIZE: (" << (int) (buff_ptr - buff.data()) << ")";
299
300 ENDPOINT_TYPE type = static_cast<ENDPOINT_TYPE>(
301 read_from_byte_array<uint8_t>(buff_ptr, bytes_left) );
302
303 switch (type)
304 {
305 case endpoint_type::TCPIPv4:
306 case endpoint_type::TCPIPv6:
307 {
308 // TODO try catch
309 TcpIP_EndpointPtr endp(new tcpip_endpoint(type, buff_ptr, bytes_left));
310 add_endpoint(endp);
311
312 // move pointers
313 const int bytes_read = endp->size();
314 buff_ptr += bytes_read;
315 bytes_left -= bytes_read;
316
317
318// // XXX AKTUELL BUG FINDING...
319// cout << endp->to_string() << " (" << (int) (buff_ptr - buff.data()) << ")" << endl;
320
321 break;
322 }
323
324 // TODO case endpoint_type::bluetooth...:
325
326 default:
327 {
328 // TODO throw
329 cout << endl << "FATAL ERROR in »endpoint_set::deserialize«: Unknown type (" << type << ")" << endl;
330 cout << "BUFFER (size = " << buff.size() << "):" << endl;
331 cout << buff << endl << "-------------------------" << endl;
332 assert(false);
333 break;
334 }
335 }
336
337 assert( bytes_left >= 0 );
338 }
339
340 // return sub-buffer
341 return buff(overall_size);
342}
343
344}} /* namespace addressing2::ariba */
Note: See TracBrowser for help on using the repository browser.