source: source/ariba/utility/transport/tcpip/tcpip.cpp@ 10653

Last change on this file since 10653 was 10653, checked in by Michael Tänzer, 12 years ago

Merge the ASIO branch back into trunk

File size: 13.4 KB
Line 
1#include "tcpip.hpp"
2
3#include <boost/array.hpp>
4
5// interface discovery for link-local destinations
6#include <ifaddrs.h>
7
8namespace ariba {
9namespace transport {
10
11use_logging_cpp(tcpip)
12
13using namespace ariba::addressing;
14
15typedef boost::mutex::scoped_lock unique_lock;
16
17tcpip::tcpip( const tcp::endpoint& endp ) :
18 listener(NULL),
19 acceptor(u_io_service.get_asio_io_service(), endp)
20{
21}
22
23tcpip::~tcpip(){}
24
25void tcpip::start()
26{
27 // open server socket
28 accept();
29
30 u_io_service.start();
31}
32
33
34void tcpip::stop()
35{
36 acceptor.close();
37
38 u_io_service.stop();
39}
40
41
42/* see header file for comments */
43void tcpip::send(
44 const tcp::endpoint& dest_addr,
45 reboost::message_t message,
46 uint8_t priority)
47{
48 ConnPtr conn;
49 bool need_to_connect = false;
50
51 {
52 unique_lock lock(connections_lock);
53
54 ConnectionMap::iterator it = connections.find(dest_addr);
55 if (it == connections.end())
56 {
57 ConnPtr tmp_ptr(
58 new tcpip_connection(
59 u_io_service.get_asio_io_service(),
60 shared_from_this() )
61 );
62 conn = tmp_ptr;
63
64 conn->partner = dest_addr;
65 conn->remote = convert_address(dest_addr);
66
67 // Note: starting the send is the obligation of the connect_handler
68 // (avoids trying to send while not connected yet)
69 conn->sending = true;
70 need_to_connect = true;
71
72 ConnectionMap::value_type item(dest_addr, conn);
73 connections.insert(item);
74
75 } else {
76 conn = it->second;
77 }
78 }
79
80
81 // * the actual send *
82 conn->enqueue_for_sending(message, priority);
83
84 // if new connection connect to the other party
85 if ( need_to_connect )
86 {
87 conn->sock.async_connect(
88 dest_addr,
89 boost::bind(
90 &tcpip_connection::async_connect_handler,
91 conn,
92 boost::asio::placeholders::error));
93 }
94}
95
96
97/* see header file for comments */
98void tcpip::send(
99 const address_v* remote,
100 reboost::message_t message,
101 uint8_t priority)
102{
103 send(convert_address(remote), message, priority);
104}
105
106
107/* see header file for comments */
108void tcpip::send(
109 const endpoint_set& endpoints,
110 reboost::message_t message,
111 uint8_t priority )
112{
113 // network interfaces scope_ids, for link-local connections (lazy initialization)
114 vector<uint64_t> scope_ids;
115
116 // send a message to each combination of address-address and port
117 BOOST_FOREACH( const ip_address address, endpoints.ip ) {
118 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
119 tcp::endpoint endp(address.asio(), port.asio());
120
121 // special treatment for link local addresses
122 // ---> send over all (suitable) interfaces
123 if ( endp.address().is_v6() )
124 {
125 boost::asio::ip::address_v6 v6_addr = endp.address().to_v6();
126
127 if ( v6_addr.is_link_local() )
128 {
129 // initialize scope_ids
130 if ( scope_ids.size() == 0 )
131 scope_ids = get_interface_scope_ids();
132
133 BOOST_FOREACH ( uint64_t id, scope_ids )
134 {
135 v6_addr.scope_id(id);
136 endp.address(v6_addr);
137
138 logging_debug("------> SEND TO (link-local): " << endp);
139 // * send *
140 send(endp, message, priority);
141 }
142 }
143
144 continue;
145 }
146
147 // * send *
148 send(endp, message, priority);
149 }
150 }
151}
152
153
154void tcpip::register_listener( transport_listener* listener )
155{
156 this->listener = listener;
157}
158
159
160void tcpip::terminate( const address_v* remote )
161{
162 terminate(convert_address(remote));
163}
164
165void tcpip::terminate( const tcp::endpoint& remote )
166{
167 ConnPtr conn;
168
169 // find and forget connection
170 {
171 unique_lock lock(connections_lock);
172
173 ConnectionMap::iterator it = connections.find(remote);
174 if (it == connections.end())
175 {
176 return;
177 }
178
179 conn = it->second;
180
181 connections.erase(it);
182 }
183
184 // close connection
185 boost::system::error_code ec;
186 conn->sock.shutdown(tcp::socket::shutdown_both, ec);
187 conn->sock.close(ec);
188}
189
190
191/* private */
192void tcpip::accept()
193{
194 // create new connection object
195 ConnPtr conn(
196 new tcpip_connection(
197 u_io_service.get_asio_io_service(),
198 shared_from_this()
199 )
200 );
201
202 // wait for incoming connection
203 acceptor.async_accept(
204 conn->sock,
205 boost::bind(&self::async_accept_handler,
206 this->shared_from_this(),
207 conn,
208 boost::asio::placeholders::error)
209 );
210}
211
212void tcpip::async_accept_handler(ConnPtr conn, const error_code& error)
213{
214 if ( ! error )
215 {
216 conn->partner = conn->sock.remote_endpoint();
217 conn->remote = convert_address(conn->partner);
218 conn->local = convert_address(conn->sock.local_endpoint());
219
220 {
221 unique_lock lock(connections_lock);
222
223 ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
224 connections.insert(item);
225 }
226
227 // read
228 conn->listen();
229 }
230
231 // accept further connections
232 accept();
233}
234
235inline tcp::endpoint tcpip::convert_address( const address_v* address )
236{
237 tcpip_endpoint endpoint = *address;
238
239 return tcp::endpoint(
240 endpoint.address().asio(), endpoint.port().value()
241 );
242}
243
244
245inline tcpip_endpoint tcpip::convert_address(const tcp::endpoint& endpoint)
246{
247 ip_address address;
248 address.asio(endpoint.address());
249 tcp_port_address port;
250 port.value(endpoint.port());
251 return tcpip_endpoint(address, port);
252}
253
254
255vector<uint64_t> tcpip::get_interface_scope_ids()
256{
257 vector<uint64_t> ret;
258
259 struct ifaddrs* ifaceBuffer = NULL;
260 void* tmpAddrPtr = NULL;
261
262 int ok = getifaddrs( &ifaceBuffer );
263 if( ok != 0 ) return ret;
264
265 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
266
267 // ignore devices that are disabled or have no ip
268 if(i == NULL) continue;
269 struct sockaddr* addr = i->ifa_addr;
270 if (addr==NULL) continue;
271
272 // only use ethX and wlanX devices
273 string device = string(i->ifa_name);
274 if ( (device.find("eth") == string::npos) &&
275 (device.find("wlan") == string::npos) /* &&
276 (device.find("lo") == string::npos) XXX */ )
277 {
278 continue;
279 }
280
281 // only use interfaces with ipv6 link-local addresses
282 if (addr->sa_family == AF_INET6)
283 {
284 // convert address
285 // TODO should be possible without detour over strings
286 char straddr[INET6_ADDRSTRLEN];
287 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
288 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
289
290 address_v6 v6addr = address_v6::from_string(straddr);
291 if ( v6addr.is_link_local() )
292 {
293 // * append the scope_id to the return vector *
294 ret.push_back(if_nametoindex(i->ifa_name));
295 }
296
297 }
298 }
299
300 freeifaddrs(ifaceBuffer);
301
302 return ret;
303}
304
305
306/*****************
307 ** inner class **
308 *****************/
309
310tcpip::tcpip_connection::tcpip_connection(boost::asio::io_service & io_service, TcpIpPtr parent) :
311 sock(io_service),
312 valid(true),
313 parent(parent),
314 out_queues(8), //TODO How much priorities shall we have?
315 sending(false)
316{
317 header.length = 0;
318 header.prot = 0;
319}
320
321/*-------------------------------------------
322 | implement transport_connection interface |
323 -------------------------------------------*/
324void tcpip::tcpip_connection::send(
325 reboost::message_t message,
326 uint8_t priority)
327{
328 enqueue_for_sending(message, priority);
329}
330
331
332address_vf tcpip::tcpip_connection::getLocalEndpoint()
333{
334 return local;
335}
336
337
338address_vf tcpip::tcpip_connection::getRemoteEndpoint()
339{
340 return remote;
341}
342
343
344void tcpip::tcpip_connection::terminate()
345{
346 parent->terminate(partner);
347}
348
349
350/*------------------------------
351 | things we defined ourselves |
352 ------------------------------*/
353void tcpip::tcpip_connection::async_connect_handler(const error_code& error)
354{
355 if (error)
356 {
357 parent->terminate(partner);
358
359 return;
360 }
361
362 // save address in ariba format
363 local = parent->convert_address(sock.local_endpoint());
364
365 // Note: sending has to be true at this point
366 send_next_package();
367
368 listen();
369}
370
371
372void tcpip::tcpip_connection::listen()
373{
374 boost::asio::async_read(
375 this->sock,
376 boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
377 boost::bind(
378 &tcpip::tcpip_connection::async_read_header_handler,
379 this->shared_from_this(),
380 boost::asio::placeholders::error,
381 boost::asio::placeholders::bytes_transferred
382 )
383 );
384}
385
386
387void tcpip::tcpip_connection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
388{
389 if (error)
390 {
391 parent->terminate(partner);
392
393 return;
394 }
395
396 // convert byte order
397 header.length = ntohl(header.length);
398 header.length -= 2; // XXX protlib
399
400 assert(header.length > 0);
401
402 // new buffer for the new packet
403 buffy = shared_buffer_t(header.length);
404
405 // * read data *
406 boost::asio::async_read(
407 this->sock,
408 boost::asio::buffer(buffy.mutable_data(), buffy.size()),
409 boost::bind(
410 &tcpip::tcpip_connection::async_read_data_handler,
411 this->shared_from_this(),
412 boost::asio::placeholders::error,
413 boost::asio::placeholders::bytes_transferred
414 )
415 );
416}
417
418void tcpip::tcpip_connection::async_read_data_handler(
419 const error_code& error, size_t bytes_transferred)
420{
421 if (error)
422 {
423 parent->terminate(partner);
424
425 return;
426 }
427
428 message_t msg;
429 msg.push_back(buffy);
430 buffy = shared_buffer_t();
431
432 if ( parent->listener )
433 parent->listener->receive_message(shared_from_this(), msg);
434
435 listen();
436}
437
438/* see header file for comments */
439void tcpip::tcpip_connection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
440{
441 if ( error )
442 {
443 // remove this connection
444 parent->terminate(partner);
445
446 return;
447 }
448
449 send_next_package();
450}
451
452
453
454void tcpip::tcpip_connection::enqueue_for_sending(Packet packet, uint8_t priority)
455{
456 bool restart_sending = false;
457
458 // enqueue packet [locked]
459 {
460 unique_lock(out_queues_lock);
461
462 assert( priority < out_queues.size() );
463 out_queues[priority].push(packet);
464
465 if ( ! sending )
466 {
467 restart_sending = true;
468 sending = true;
469 }
470 }
471
472 // if sending was stopped, we have to restart it here
473 if ( restart_sending )
474 {
475 send_next_package();
476 }
477}
478
479/* see header file for comments */
480void tcpip::tcpip_connection::send_next_package()
481{
482 Packet packet;
483 bool found = false;
484
485 // find packet with highest priority [locked]
486 {
487 unique_lock(out_queues_lock);
488
489 for ( vector<OutQueue>::iterator it = out_queues.begin();
490 it != out_queues.end(); it++ )
491 {
492 if ( !it->empty() )
493 {
494 packet = it->front();
495 it->pop();
496 found = true;
497
498 break;
499 }
500 }
501
502 // no packets waiting --> stop sending
503 if ( ! found )
504 {
505 sending = false;
506 }
507 }
508
509 // * send *
510 if ( found )
511 {
512 reboost::shared_buffer_t header_buf(sizeof(header_t));
513 header_t* header = (header_t*)(header_buf.mutable_data());
514 header->length = htonl(packet.size()+2); // XXX protlib
515
516 packet.push_front(header_buf);
517
518 // "convert" message to asio buffer sequence
519 vector<boost::asio::const_buffer> send_sequence(packet.length());
520 for ( int i=0; i < packet.length(); i++ )
521 {
522 shared_buffer_t b = packet.at(i);
523 send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
524 }
525
526 // * async write *
527 boost::asio::async_write(
528 this->sock,
529 send_sequence,
530 boost::bind(
531 &tcpip::tcpip_connection::async_write_handler,
532 this->shared_from_this(),
533 packet, // makes sure our shared pointer lives long enough ;-)
534 boost::asio::placeholders::error,
535 boost::asio::placeholders::bytes_transferred)
536 );
537 }
538}
539
540}} // namespace ariba::transport
Note: See TracBrowser for help on using the repository browser.