close Warning: Can't use blame annotator:
No changeset 10924 in the repository

source: source/ariba/utility/transport/StreamTransport/StreamTransport.cpp@ 12063

Last change on this file since 12063 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 20.8 KB
RevLine 
1#include "StreamTransport.hpp"
2
3// ariba
4#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
5
6// boost-adaption
7#include "ariba/utility/transport/rfcomm/bluetooth_rfcomm.hpp"
8
9// boost
10#include <boost/foreach.hpp>
11#include <boost/array.hpp>
12#include <boost/asio/ip/address.hpp>
13
14// interface discovery for link-local destinations (tcp-only)
15#include <ifaddrs.h>
16
17
18namespace ariba {
19namespace transport {
20
21//use_logging_cpp(StreamTransport<T>)
22
23#ifdef HAVE_LIBBLUETOOTH
24 using boost::asio::bluetooth::rfcomm;
25#endif /* HAVE_LIBBLUETOOTH */
26
27using namespace ariba::addressing2;
28using ariba::utility::LinkID;
29
30using boost::asio::ip::tcp;
31using boost::asio::ip::address_v6;
32using boost::shared_ptr;
33
34typedef boost::mutex::scoped_lock unique_lock;
35
36
37template <class T>
38StreamTransport<T>::StreamTransport( const typename T::endpoint& endp ) :
39 listener(NULL),
40 acceptor(u_io_service.get_asio_io_service(), endp)
41{
42}
43
44template <class T>
45StreamTransport<T>::~StreamTransport(){}
46
47template <class T>
48void StreamTransport<T>::start()
49{
50 // open server socket
51 accept();
52
53 u_io_service.start();
54}
55
56
57template <class T>
58void StreamTransport<T>::stop()
59{
60 acceptor.close();
61
62 u_io_service.stop();
63}
64
65
66/* see header file for comments */
67template <class T>
68void StreamTransport<T>::send(
69 const typename T::endpoint& dest_addr,
70 reboost::message_t message,
71 uint8_t priority)
72{
73 ConnPtr conn;
74 bool need_to_connect = false;
75
76 {
77 unique_lock lock(connections_lock);
78
79 // try to find existing connection to this endpoint
80 typename ConnectionMap::iterator it = connections.find(dest_addr);
81
82 // BRANCH: create new connection
83 if (it == connections.end())
84 {
85 ConnPtr tmp_ptr(
86 new StreamConnection(
87 u_io_service.get_asio_io_service(),
88 this->shared_from_this() )
89 );
90 conn = tmp_ptr;
91
92 // save partner endpoint
93 conn->partner_endpoint = dest_addr;
94
95 // Note: starting the send is the obligation of the connect_handler
96 // (avoids trying to send while not connected yet)
97 conn->sending = true;
98 need_to_connect = true;
99
100 typename ConnectionMap::value_type item(dest_addr, conn);
101 connections.insert(item);
102
103 }
104 // BRANCH: use existing connection
105 else
106 {
107 conn = it->second;
108 }
109 }
110
111
112 // * the actual send *
113 conn->enqueue_for_sending(message, priority);
114
115 // if new connection connect to the other party
116 if ( need_to_connect )
117 {
118 conn->sock.async_connect(
119 dest_addr,
120 boost::bind(
121 &StreamConnection::async_connect_handler,
122 conn,
123 boost::asio::placeholders::error));
124 }
125}
126
127
128// TODO is this a private function? ... and still needed?
129/* see header file for comments */
130template <class T>
131void StreamTransport<T>::send(
132 const EndpointPtr remote,
133 reboost::message_t message,
134 uint8_t priority)
135{
136 if ( remote->get_category() == endpoint_category::TCPIP )
137 {
138 const addressing2::tcpip_endpoint* endp =
139 static_cast<const addressing2::tcpip_endpoint*>(remote.get());
140
141 send(endp->to_asio(), message, priority);
142 }
143
144 // else
145 // TODO what now?
146}
147
148
149template <class T>
150void StreamTransport<T>::register_listener( transport_listener* listener )
151{
152 this->listener = listener;
153}
154
155
156// XXX DEPRECATED
157template <class T>
158void StreamTransport<T>::terminate( const EndpointPtr remote )
159{
160 if ( remote->get_category() == endpoint_category::TCPIP )
161 {
162 const addressing2::tcpip_endpoint* endp =
163 static_cast<const addressing2::tcpip_endpoint*>(remote.get());
164
165
166 this->terminate(endp->to_asio());
167 }
168}
169
170template <class T>
171void StreamTransport<T>::terminate( const typename T::endpoint& remote )
172{
173 ConnPtr conn;
174
175 // find and forget connection
176 {
177 unique_lock lock(connections_lock);
178
179 typename ConnectionMap::iterator it = connections.find(remote);
180 if (it == connections.end())
181 {
182 return;
183 }
184
185 conn = it->second;
186
187 // prevent upper layers from using this link
188 conn->valid = false;
189
190 connections.erase(it);
191 }
192
193 // XXX aktuell
194// cout << "/// MARIO: TCP/IP TERMINATE: " << conn->partner_endpoint << endl;
195
196 // notify higher layers
197 if ( listener )
198 {
199 listener->connection_terminated(conn);
200 }
201
202
203 // XXX debug aktuell
204// cout << "/// MARIO Open connections:" << endl;
205// for ( typename ConnectionMap::iterator it = connections.begin(); it != connections.end(); ++it )
206// {
207// cout << " CONNECTION: " << it->second->local_endpoint << " <---> " << it->second->partner_endpoint << endl;
208// cout << " used by: " << endl;
209//
210// int usecount = 0;
211// ConnPtr xx;
212// std::vector<LinkID*> links = it->second->get_communication_links();
213// for ( std::vector<LinkID*>::iterator it2 = links.begin(); it2 != links.end(); ++it2 )
214// {
215// cout << " - " << *it2 << endl;
216// usecount++;
217// }
218// if ( usecount == 0 )
219// {
220// cout << " ---> NOBODY !!" << endl;
221// }
222// }
223// cout << "/// -------" << endl;
224
225
226 // close connection
227 boost::system::error_code ec;
228 conn->sock.shutdown(T::socket::shutdown_both, ec);
229 conn->sock.close(ec);
230}
231
232
233/* private */
234template <class T>
235void StreamTransport<T>::accept()
236{
237 // create new connection object
238 ConnPtr conn(
239 new StreamConnection(
240 u_io_service.get_asio_io_service(),
241 this->shared_from_this()
242 )
243 );
244
245 // wait for incoming connection
246 acceptor.async_accept(
247 conn->sock,
248 boost::bind(&self::async_accept_handler,
249 this->shared_from_this(),
250 conn,
251 boost::asio::placeholders::error)
252 );
253}
254
255template <class T>
256void StreamTransport<T>::async_accept_handler(ConnPtr conn, const error_code& error)
257{
258 if ( ! error )
259 {
260 // save partner endpoint
261 conn->partner_endpoint = conn->sock.remote_endpoint();
262
263 {
264 unique_lock lock(connections_lock);
265
266 typename ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
267 connections.insert(item);
268 }
269
270 // read
271 conn->listen();
272 }
273
274 // accept further connections
275 accept();
276}
277
278
279
280/*------------------
281 | specializations |
282 ------------------*/
283/* TCP */
284template <>
285void StreamTransport<tcp>::send(
286 const addressing2::const_EndpointSetPtr endpoints,
287 reboost::message_t message,
288 uint8_t priority )
289{
290 // network interfaces scope_ids, for link-local connections (lazy initialization)
291 vector<uint64_t> scope_ids;
292
293 // send a message to each combination of address-address and port
294 BOOST_FOREACH( const TcpIP_EndpointPtr address, endpoints->get_tcpip_endpoints() )
295// vector<TcpIP_EndpointPtr> endpoint_vec = endpoints->get_tcpip_endpoints();
296// for ( vector<TcpIP_EndpointPtr>::iterator it = endpoint_vec.begin(); it != endpoint_vec.end(); ++it )
297 {
298 tcp::endpoint endp = address->to_asio();
299
300 // special treatment for link local addresses
301 // ---> send over all (suitable) interfaces
302 if ( endp.address().is_v6() )
303 {
304 boost::asio::ip::address_v6 v6_addr = endp.address().to_v6();
305
306 if ( v6_addr.is_link_local() )
307 {
308 // initialize scope_ids
309 if ( scope_ids.size() == 0 )
310 scope_ids = get_interface_scope_ids();
311
312 BOOST_FOREACH ( uint64_t id, scope_ids )
313 {
314 v6_addr.scope_id(id);
315 endp.address(v6_addr);
316
317// logging_debug("------> SEND TO (link-local): " << endp);
318 // * send *
319 send(endp, message, priority);
320 }
321 }
322
323 continue;
324 }
325
326 // * send *
327 send(endp, message, priority);
328 }
329}
330
331
332/* RFCOMM */
333#ifdef HAVE_LIBBLUETOOTH
334
335// TODO
336
337// template <>
338// void StreamTransport<rfcomm>::send(
339// const endpoint_set& endpoints,
340// reboost::message_t message,
341// uint8_t priority )
342// {
343// // send a message to each combination of address-address and port
344// BOOST_FOREACH( const mac_address mac, endpoints.bluetooth ) {
345// BOOST_FOREACH( const rfcomm_channel_address channel, endpoints.rfcomm ) {
346// rfcomm::endpoint endp(mac.bluetooth(), channel.value());
347//
348// // * send *
349// send(endp, message, priority);
350// }
351// }
352// }
353
354#endif /* HAVE_LIBBLUETOOTH */
355
356
357
358/*****************
359 ** inner class **
360 *****************/
361
362template <class T>
363StreamTransport<T>::StreamConnection::StreamConnection(boost::asio::io_service & io_service, StreamTransportPtr parent) :
364 sock(io_service),
365 valid(true),
366 parent(parent),
367 out_queues(8), //TODO How much priorities shall we have?
368 sending(false)
369{
370 header.length = 0;
371}
372
373/*-------------------------------------------
374 | implement transport_connection interface |
375 -------------------------------------------*/
376template <class T>
377bool StreamTransport<T>::StreamConnection::send(
378 reboost::message_t message,
379 uint8_t priority)
380{
381 if ( ! valid )
382 {
383 // XXX aktuell
384// cout << "/// MARIO: USED INVALID LINK: " << this->partner_endpoint << endl;
385
386 return false;
387 }
388
389 // * enqueue data *
390 enqueue_for_sending(message, priority);
391
392 return true;
393}
394
395
396template <class T>
397EndpointPtr StreamTransport<T>::StreamConnection::getLocalEndpoint()
398{
399 EndpointPtr ret(new addressing2::tcpip_endpoint(local_endpoint));
400
401 return ret;
402}
403
404
405template <class T>
406EndpointPtr StreamTransport<T>::StreamConnection::getRemoteEndpoint()
407{
408 EndpointPtr ret(new addressing2::tcpip_endpoint(partner_endpoint));
409
410 return ret;
411}
412
413template <class T>
414void StreamTransport<T>::StreamConnection::register_communication_link(LinkID* link)
415{
416 if ( ! link )
417 return;
418
419 // add link
420 communication_links.push_back(link);
421}
422
423template <class T>
424void StreamTransport<T>::StreamConnection::unregister_communication_link(LinkID* link)
425{
426 if ( ! link )
427 return;
428
429
430 // remove link
431 {
432 std::vector<LinkID*>::iterator it = communication_links.begin();
433
434 while ( it != communication_links.end() )
435 {
436 if ( (*it) == link )
437 {
438 it = communication_links.erase(it);
439 }
440 else
441 {
442 ++it;
443 }
444 }
445 }
446
447 // this connection is no longer used by any link
448 if ( communication_links.empty() )
449 {
450 //XXX
451// cout << "communication_links.empty() " << getLocalEndpoint()->to_string() << " - " << getRemoteEndpoint()->to_string() << endl;
452
453 // terminate connection
454 this->terminate(); // TODO aktuell
455
456 /*
457 * TODO racecondition:
458 * When receiving a link request, the connection could closed
459 * before the request is handled.
460 *
461 * ---> Maybe wait a timeout before actually terminate the connection.
462 * (e.g. record last-used time:
463 * if last used > timeout and communication_links.empty()
464 * then terminate the connection)
465 */
466 }
467}
468
469template <class T>
470std::vector<LinkID*> StreamTransport<T>::StreamConnection::get_communication_links()
471{
472 return communication_links;
473}
474
475
476template <class T>
477void StreamTransport<T>::StreamConnection::terminate()
478{
479 parent->terminate(partner_endpoint);
480}
481
482
483/*------------------------------
484 | things we defined ourselves |
485 ------------------------------*/
486template <class T>
487void StreamTransport<T>::StreamConnection::async_connect_handler(const error_code& error)
488{
489 if (error)
490 {
491 parent->terminate(partner_endpoint);
492
493 return;
494 }
495
496 // save local endpoint
497 local_endpoint = sock.local_endpoint();
498
499 // Note: sending has to be true at this point
500 send_next_package();
501
502 listen();
503}
504
505
506template <class T>
507void StreamTransport<T>::StreamConnection::listen()
508{
509 boost::asio::async_read(
510 this->sock,
511 boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
512 boost::bind(
513 &StreamTransport<T>::StreamConnection::async_read_header_handler,
514 this->shared_from_this(),
515 boost::asio::placeholders::error,
516 boost::asio::placeholders::bytes_transferred
517 )
518 );
519}
520
521
522template <class T>
523void StreamTransport<T>::StreamConnection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
524{
525 if (error)
526 {
527 parent->terminate(partner_endpoint);
528
529 return;
530 }
531
532 // sanity checks
533 // NOTE: max size 8k (may be changed later..)
534 if ( header.length == 0 || header.length > 8192 )
535 {
536 parent->terminate(partner_endpoint);
537 }
538
539
540 // new buffer for the new packet
541 buffy = shared_buffer_t(header.length);
542
543 // * read data *
544 boost::asio::async_read(
545 this->sock,
546 boost::asio::buffer(buffy.mutable_data(), buffy.size()),
547 boost::bind(
548 &StreamTransport<T>::StreamConnection::async_read_data_handler,
549 this->shared_from_this(),
550 boost::asio::placeholders::error,
551 boost::asio::placeholders::bytes_transferred
552 )
553 );
554}
555
556template <class T>
557void StreamTransport<T>::StreamConnection::async_read_data_handler(
558 const error_code& error, size_t bytes_transferred)
559{
560 if (error)
561 {
562 parent->terminate(partner_endpoint);
563
564 return;
565 }
566
567 if ( parent->listener )
568 parent->listener->receive_message(this->shared_from_this(), buffy);
569
570 buffy = shared_buffer_t();
571 listen();
572}
573
574/* see header file for comments */
575template <class T>
576void StreamTransport<T>::StreamConnection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
577{
578 if ( error )
579 {
580 // remove this connection
581 parent->terminate(partner_endpoint);
582
583 return;
584 }
585
586 send_next_package();
587}
588
589
590
591template <class T>
592void StreamTransport<T>::StreamConnection::enqueue_for_sending(Packet packet, uint8_t priority)
593{
594 bool restart_sending = false;
595
596 // enqueue packet [locked]
597 {
598 unique_lock(out_queues_lock);
599
600 assert( priority < out_queues.size() );
601 out_queues[priority].push(packet);
602
603 if ( ! sending )
604 {
605 restart_sending = true;
606 sending = true;
607 }
608 }
609
610 // if sending was stopped, we have to restart it here
611 if ( restart_sending )
612 {
613 send_next_package();
614 }
615}
616
617/* see header file for comments */
618template <class T>
619void StreamTransport<T>::StreamConnection::send_next_package()
620{
621 Packet packet;
622 bool found = false;
623
624 // find packet with highest priority [locked]
625 {
626 unique_lock(out_queues_lock);
627
628 for ( vector<OutQueue>::iterator it = out_queues.begin();
629 it != out_queues.end(); it++ )
630 {
631 if ( !it->empty() )
632 {
633 packet = it->front();
634 it->pop();
635 found = true;
636
637 break;
638 }
639 }
640
641 // no packets waiting --> stop sending
642 if ( ! found )
643 {
644 sending = false;
645 }
646 }
647
648 // * send *
649 if ( found )
650 {
651 reboost::shared_buffer_t header_buf(sizeof(header_t));
652 header_t* header = (header_t*)(header_buf.mutable_data());
653 header->length = packet.size();
654
655 packet.push_front(header_buf);
656
657 // "convert" message to asio buffer sequence
658 vector<boost::asio::const_buffer> send_sequence(packet.length());
659 for ( int i=0; i < packet.length(); i++ )
660 {
661 shared_buffer_t b = packet.at(i);
662 send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
663 }
664
665 // * async write *
666 boost::asio::async_write(
667 this->sock,
668 send_sequence,
669 boost::bind(
670 &StreamTransport<T>::StreamConnection::async_write_handler,
671 this->shared_from_this(),
672 packet, // makes sure our shared pointer lives long enough ;-)
673 boost::asio::placeholders::error,
674 boost::asio::placeholders::bytes_transferred)
675 );
676 }
677}
678/*********************
679 ** [ inner class ] **
680 *********************/
681
682
683
684// explicitly tell the compiler to create a »tcp« (and »rfcomm«) specialization
685// --> (needed since hpp and cpp are separated)
686template class StreamTransport<boost::asio::ip::tcp>;
687
688#ifdef HAVE_LIBBLUETOOTH
689 template class StreamTransport<rfcomm>;
690#endif /* HAVE_LIBBLUETOOTH */
691
692
693
694/////////////////////////////////////////////////////////////////////////////////////
695
696
697// XXX testing
698///**
699// * Conversion between ASIO Adresses and ARIBA adresses
700// */
701///* TCP */
702//template <>
703//inline typename tcp::endpoint convert_address<tcp>( const address_v* address )
704//{
705// tcpip_endpoint endpoint = *address;
706//
707// return typename tcp::endpoint(
708// endpoint.address().asio(), endpoint.port().value()
709// );
710//}
711//
712//template <>
713//inline EndpointPtr convert_address<tcp>(const typename tcp::endpoint& endpoint)
714//{
715// ip_address address;
716// address.asio(endpoint.address());
717// tcp_port_address port;
718// port.value(endpoint.port());
719//
720//// new tcpip_endpoint(address, port);
721// tcpip_endpoint xx;
722// address_vf yy;
723// address_v* zz = yy->clone();
724// address_v::shared_ptr endp(zz); // XXX
725//
726// return endp;
727//}
728//
729///* RFCOMM */
730//#ifdef HAVE_LIBBLUETOOTH
731// template <>
732// inline typename rfcomm::endpoint convert_address<rfcomm>( const address_v* address )
733// {
734// rfcomm_endpoint endpoint = *address;
735//
736// return rfcomm::endpoint(
737// endpoint.mac().bluetooth(), endpoint.channel().value()
738// );
739// }
740//
741// template <>
742// inline address_v::shared_ptr convert_address<rfcomm>(const typename rfcomm::endpoint& endpoint)
743// {
744// mac_address mac;
745// mac.bluetooth(endpoint.address());
746// rfcomm_channel_address channel;
747// channel.value(endpoint.channel());
748//
749// address_v::shared_ptr endp((ariba::addressing::address_v*) new rfcomm_endpoint(mac, channel));
750//
751// return endp;
752// }
753//#endif /* HAVE_LIBBLUETOOTH */
754
755
756
757/////////////////////////////////////////////////////////////////////////////////////
758
759
760/*
761 * Get Ethernet scope ids (for link-local)
762 */
763vector<uint64_t> get_interface_scope_ids()
764{
765 vector<uint64_t> ret;
766
767 struct ifaddrs* ifaceBuffer = NULL;
768 void* tmpAddrPtr = NULL;
769
770 int ok = getifaddrs( &ifaceBuffer );
771 if( ok != 0 ) return ret;
772
773 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
774
775 // ignore devices that are disabled or have no ip
776 if(i == NULL) continue;
777 struct sockaddr* addr = i->ifa_addr;
778 if (addr==NULL) continue;
779
780 // only use ethX and wlanX devices
781 string device = string(i->ifa_name);
782 if ( (device.find("eth") == string::npos) &&
783 (device.find("wlan") == string::npos) /* &&
784 (device.find("lo") == string::npos) XXX */ )
785 {
786 continue;
787 }
788
789 // only use interfaces with ipv6 link-local addresses
790 if (addr->sa_family == AF_INET6)
791 {
792 // convert address
793 // TODO should be possible without detour over strings
794 char straddr[INET6_ADDRSTRLEN];
795 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
796 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
797
798 address_v6 v6addr = address_v6::from_string(straddr);
799 if ( v6addr.is_link_local() )
800 {
801 // * append the scope_id to the return vector *
802 ret.push_back(if_nametoindex(i->ifa_name));
803 }
804
805 }
806 }
807
808 freeifaddrs(ifaceBuffer);
809
810 return ret;
811}
812
813
814}} // namespace ariba::transport
Note: See TracBrowser for help on using the repository browser.