close Warning: Can't use blame annotator:
No changeset 10924 in the repository

source: source/ariba/utility/transport/StreamTransport/StreamTransport.cpp@ 12775

Last change on this file since 12775 was 12774, checked in by hock@…, 11 years ago

StreamTransport bug fixed!!

[ Back to normal. :-) ]

File size: 21.2 KB
RevLine 
1#include "StreamTransport.hpp"
2
3// ariba
4#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
5
6// boost-adaption
7#include "ariba/utility/transport/rfcomm/bluetooth_rfcomm.hpp"
8
9// boost
10#include <boost/foreach.hpp>
11#include <boost/array.hpp>
12#include <boost/asio/ip/address.hpp>
13
14// interface discovery for link-local destinations (tcp-only)
15#include <ifaddrs.h>
16
17
18namespace ariba {
19namespace transport {
20
21//use_logging_cpp(StreamTransport<T>)
22
23#ifdef HAVE_LIBBLUETOOTH
24 using boost::asio::bluetooth::rfcomm;
25#endif /* HAVE_LIBBLUETOOTH */
26
27using namespace ariba::addressing2;
28using ariba::utility::LinkID;
29
30using boost::asio::ip::tcp;
31using boost::asio::ip::address_v6;
32using boost::shared_ptr;
33
34typedef boost::mutex::scoped_lock unique_lock;
35
36
37template <class T>
38StreamTransport<T>::StreamTransport( const typename T::endpoint& endp ) :
39 listener(NULL),
40 acceptor(u_io_service.get_asio_io_service(), endp)
41{
42}
43
44template <class T>
45StreamTransport<T>::~StreamTransport(){}
46
47template <class T>
48void StreamTransport<T>::start()
49{
50 // open server socket
51 accept();
52
53 u_io_service.start();
54}
55
56
57template <class T>
58void StreamTransport<T>::stop()
59{
60 acceptor.close();
61
62 u_io_service.stop();
63}
64
65
66/* see header file for comments */
67template <class T>
68void StreamTransport<T>::send(
69 const typename T::endpoint& dest_addr,
70 reboost::message_t message,
71 uint8_t priority)
72{
73 ConnPtr conn;
74 bool need_to_connect = false;
75
76 {
77 unique_lock lock(connections_lock);
78
79 // try to find existing connection to this endpoint
80 typename ConnectionMap::iterator it = connections.find(dest_addr);
81
82 // BRANCH: create new connection
83 if (it == connections.end())
84 {
85 ConnPtr tmp_ptr(
86 new StreamConnection(
87 u_io_service.get_asio_io_service(),
88 this->shared_from_this() )
89 );
90 conn = tmp_ptr;
91
92 // save partner endpoint
93 conn->partner_endpoint = dest_addr;
94
95 // Note: starting the send is the obligation of the connect_handler
96 // (avoids trying to send while not connected yet)
97 conn->sending = true;
98 need_to_connect = true;
99
100 typename ConnectionMap::value_type item(dest_addr, conn);
101 connections.insert(item);
102
103 }
104 // BRANCH: use existing connection
105 else
106 {
107 conn = it->second;
108 }
109 }
110
111
112 // * the actual send *
113 conn->enqueue_for_sending(message, priority);
114
115 // if new connection connect to the other party
116 if ( need_to_connect )
117 {
118 conn->sock.async_connect(
119 dest_addr,
120 boost::bind(
121 &StreamConnection::async_connect_handler,
122 conn,
123 boost::asio::placeholders::error));
124 }
125}
126
127
128// TODO is this a private function? ... and still needed?
129/* see header file for comments */
130template <class T>
131void StreamTransport<T>::send(
132 const EndpointPtr remote,
133 reboost::message_t message,
134 uint8_t priority)
135{
136 if ( remote->get_category() == endpoint_category::TCPIP )
137 {
138 const addressing2::tcpip_endpoint* endp =
139 static_cast<const addressing2::tcpip_endpoint*>(remote.get());
140
141 send(endp->to_asio(), message, priority);
142 }
143
144 // else
145 // TODO what now?
146}
147
148
149template <class T>
150void StreamTransport<T>::register_listener( transport_listener* listener )
151{
152 this->listener = listener;
153}
154
155
156// XXX DEPRECATED
157template <class T>
158void StreamTransport<T>::terminate( const EndpointPtr remote )
159{
160 if ( remote->get_category() == endpoint_category::TCPIP )
161 {
162 const addressing2::tcpip_endpoint* endp =
163 static_cast<const addressing2::tcpip_endpoint*>(remote.get());
164
165
166 this->terminate(endp->to_asio());
167 }
168}
169
170template <class T>
171void StreamTransport<T>::terminate( const typename T::endpoint& remote )
172{
173 ConnPtr conn;
174
175 // find and forget connection
176 {
177 unique_lock lock(connections_lock);
178
179 typename ConnectionMap::iterator it = connections.find(remote);
180 if (it == connections.end())
181 {
182 return;
183 }
184
185 conn = it->second;
186
187 // prevent upper layers from using this link
188 conn->valid = false;
189
190 connections.erase(it);
191 }
192
193 // XXX aktuell
194// cout << "/// MARIO: TCP/IP TERMINATE: " << conn->partner_endpoint << endl;
195
196 // notify higher layers
197 if ( listener )
198 {
199 listener->connection_terminated(conn);
200 }
201
202
203 // XXX debug aktuell
204// cout << "/// MARIO Open connections:" << endl;
205// for ( typename ConnectionMap::iterator it = connections.begin(); it != connections.end(); ++it )
206// {
207// cout << " CONNECTION: " << it->second->local_endpoint << " <---> " << it->second->partner_endpoint << endl;
208// cout << " used by: " << endl;
209//
210// int usecount = 0;
211// ConnPtr xx;
212// std::vector<LinkID*> links = it->second->get_communication_links();
213// for ( std::vector<LinkID*>::iterator it2 = links.begin(); it2 != links.end(); ++it2 )
214// {
215// cout << " - " << *it2 << endl;
216// usecount++;
217// }
218// if ( usecount == 0 )
219// {
220// cout << " ---> NOBODY !!" << endl;
221// }
222// }
223// cout << "/// -------" << endl;
224
225
226 // close connection
227 boost::system::error_code ec;
228 conn->sock.shutdown(T::socket::shutdown_both, ec);
229 conn->sock.close(ec);
230}
231
232
233/* private */
234template <class T>
235void StreamTransport<T>::accept()
236{
237 // create new connection object
238 ConnPtr conn(
239 new StreamConnection(
240 u_io_service.get_asio_io_service(),
241 this->shared_from_this()
242 )
243 );
244
245 // wait for incoming connection
246 acceptor.async_accept(
247 conn->sock,
248 boost::bind(&self::async_accept_handler,
249 this->shared_from_this(),
250 conn,
251 boost::asio::placeholders::error)
252 );
253}
254
255template <class T>
256void StreamTransport<T>::async_accept_handler(ConnPtr conn, const error_code& error)
257{
258 if ( ! error )
259 {
260 // save partner endpoint
261 conn->partner_endpoint = conn->sock.remote_endpoint();
262
263 {
264 unique_lock lock(connections_lock);
265
266 typename ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
267 connections.insert(item);
268 }
269
270 // read
271 conn->listen();
272 }
273
274 // accept further connections
275 accept();
276}
277
278
279
280/*------------------
281 | specializations |
282 ------------------*/
283/* TCP */
284template <>
285void StreamTransport<tcp>::send(
286 const addressing2::const_EndpointSetPtr endpoints,
287 reboost::message_t message,
288 uint8_t priority )
289{
290 // network interfaces scope_ids, for link-local connections (lazy initialization)
291 vector<uint64_t> scope_ids;
292
293 // send a message to each combination of address-address and port
294 BOOST_FOREACH( const TcpIP_EndpointPtr address, endpoints->get_tcpip_endpoints() )
295// vector<TcpIP_EndpointPtr> endpoint_vec = endpoints->get_tcpip_endpoints();
296// for ( vector<TcpIP_EndpointPtr>::iterator it = endpoint_vec.begin(); it != endpoint_vec.end(); ++it )
297 {
298 tcp::endpoint endp = address->to_asio();
299
300 // special treatment for link local addresses
301 // ---> send over all (suitable) interfaces
302 if ( endp.address().is_v6() )
303 {
304 boost::asio::ip::address_v6 v6_addr = endp.address().to_v6();
305
306 if ( v6_addr.is_link_local() )
307 {
308 // initialize scope_ids
309 if ( scope_ids.size() == 0 )
310 scope_ids = get_interface_scope_ids();
311
312 BOOST_FOREACH ( uint64_t id, scope_ids )
313 {
314 v6_addr.scope_id(id);
315 endp.address(v6_addr);
316
317// logging_debug("------> SEND TO (link-local): " << endp);
318 // * send *
319 send(endp, message, priority);
320 }
321 }
322
323 continue;
324 }
325
326 // * send *
327 send(endp, message, priority);
328 }
329}
330
331
332/* RFCOMM */
333#ifdef HAVE_LIBBLUETOOTH
334
335// TODO
336
337// template <>
338// void StreamTransport<rfcomm>::send(
339// const endpoint_set& endpoints,
340// reboost::message_t message,
341// uint8_t priority )
342// {
343// // send a message to each combination of address-address and port
344// BOOST_FOREACH( const mac_address mac, endpoints.bluetooth ) {
345// BOOST_FOREACH( const rfcomm_channel_address channel, endpoints.rfcomm ) {
346// rfcomm::endpoint endp(mac.bluetooth(), channel.value());
347//
348// // * send *
349// send(endp, message, priority);
350// }
351// }
352// }
353
354#endif /* HAVE_LIBBLUETOOTH */
355
356
357
358/*****************
359 ** inner class **
360 *****************/
361
362template <class T>
363StreamTransport<T>::StreamConnection::StreamConnection(boost::asio::io_service & io_service, StreamTransportPtr parent) :
364 sock(io_service),
365 valid(true),
366 parent(parent),
367 out_queues(8), //TODO How much priorities shall we have?
368 sending(false)
369{
370 header.length = 0;
371}
372
373/*-------------------------------------------
374 | implement transport_connection interface |
375 -------------------------------------------*/
376template <class T>
377bool StreamTransport<T>::StreamConnection::send(
378 reboost::message_t message,
379 uint8_t priority)
380{
381 if ( ! valid )
382 {
383 // XXX aktuell
384// cout << "/// MARIO: USED INVALID LINK: " << this->partner_endpoint << endl;
385
386 return false;
387 }
388
389 // * enqueue data *
390 enqueue_for_sending(message, priority);
391
392 return true;
393}
394
395
396template <class T>
397EndpointPtr StreamTransport<T>::StreamConnection::getLocalEndpoint()
398{
399 EndpointPtr ret(new addressing2::tcpip_endpoint(local_endpoint));
400
401 return ret;
402}
403
404
405template <class T>
406EndpointPtr StreamTransport<T>::StreamConnection::getRemoteEndpoint()
407{
408 EndpointPtr ret(new addressing2::tcpip_endpoint(partner_endpoint));
409
410 return ret;
411}
412
413template <class T>
414void StreamTransport<T>::StreamConnection::register_communication_link(LinkID* link)
415{
416 if ( ! link )
417 return;
418
419 // add link
420 communication_links.push_back(link);
421}
422
423template <class T>
424void StreamTransport<T>::StreamConnection::unregister_communication_link(LinkID* link)
425{
426 if ( ! link )
427 return;
428
429
430 // remove link
431 {
432 std::vector<LinkID*>::iterator it = communication_links.begin();
433
434 while ( it != communication_links.end() )
435 {
436 if ( (*it) == link )
437 {
438 it = communication_links.erase(it);
439 }
440 else
441 {
442 ++it;
443 }
444 }
445 }
446
447 // this connection is no longer used by any link
448 if ( communication_links.empty() )
449 {
450 //XXX
451// cout << "communication_links.empty() " << getLocalEndpoint()->to_string() << " - " << getRemoteEndpoint()->to_string() << endl;
452
453 // terminate connection
454 this->terminate(); // TODO aktuell
455
456 /*
457 * TODO racecondition:
458 * When receiving a link request, the connection could closed
459 * before the request is handled.
460 *
461 * ---> Maybe wait a timeout before actually terminate the connection.
462 * (e.g. record last-used time:
463 * if last used > timeout and communication_links.empty()
464 * then terminate the connection)
465 */
466 }
467}
468
469template <class T>
470std::vector<LinkID*> StreamTransport<T>::StreamConnection::get_communication_links()
471{
472 return communication_links;
473}
474
475
476template <class T>
477void StreamTransport<T>::StreamConnection::terminate()
478{
479 parent->terminate(partner_endpoint);
480}
481
482
483/*------------------------------
484 | things we defined ourselves |
485 ------------------------------*/
486template <class T>
487void StreamTransport<T>::StreamConnection::async_connect_handler(const error_code& error)
488{
489 if (error)
490 {
491 parent->terminate(partner_endpoint);
492
493 return;
494 }
495
496 // save local endpoint
497 local_endpoint = sock.local_endpoint();
498
499 // Note: sending has to be true at this point
500 send_next_package();
501
502 listen();
503}
504
505
506template <class T>
507void StreamTransport<T>::StreamConnection::listen()
508{
509 boost::asio::async_read(
510 this->sock,
511 boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
512 boost::bind(
513 &StreamTransport<T>::StreamConnection::async_read_header_handler,
514 this->shared_from_this(),
515 boost::asio::placeholders::error,
516 boost::asio::placeholders::bytes_transferred
517 )
518 );
519}
520
521
522template <class T>
523void StreamTransport<T>::StreamConnection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
524{
525 if (error)
526 {
527 parent->terminate(partner_endpoint);
528
529 return;
530 }
531
532 // sanity checks
533 // NOTE: max size 8k (may be changed later..)
534 if ( header.length == 0 || header.length > 8192 )
535 {
536 parent->terminate(partner_endpoint);
537 }
538
539
540 // new buffer for the new packet
541 buffy = shared_buffer_t(header.length);
542
543 // * read data *
544 boost::asio::async_read(
545 this->sock,
546 boost::asio::buffer(buffy.mutable_data(), buffy.size()),
547 boost::bind(
548 &StreamTransport<T>::StreamConnection::async_read_data_handler,
549 this->shared_from_this(),
550 boost::asio::placeholders::error,
551 boost::asio::placeholders::bytes_transferred
552 )
553 );
554}
555
556template <class T>
557void StreamTransport<T>::StreamConnection::async_read_data_handler(
558 const error_code& error, size_t bytes_transferred)
559{
560 if (error)
561 {
562 parent->terminate(partner_endpoint);
563
564 return;
565 }
566
567 if ( parent->listener )
568 parent->listener->receive_message(this->shared_from_this(), buffy);
569
570 buffy = shared_buffer_t();
571 listen();
572}
573
574/* see header file for comments */
575template <class T>
576void StreamTransport<T>::StreamConnection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
577{
578 if ( error )
579 {
580 // remove this connection
581 parent->terminate(partner_endpoint);
582
583 return;
584 }
585
586 send_next_package();
587}
588
589
590
591template <class T>
592void StreamTransport<T>::StreamConnection::enqueue_for_sending(Packet packet, uint8_t priority)
593{
594 bool restart_sending = false;
595
596 // debugging --> copy message (instead of zero copy)
597// reboost::shared_buffer_t buff = packet.linearize();
598// reboost::message_t msg;
599// msg.push_back(buff);
600// assert ( msg.MAGIC_NUMBER == 421337 );
601 // [ debugging ]
602
603
604 // enqueue packet [locked]
605 {
606 unique_lock lock(out_queues_lock);
607
608 assert ( this->valid );
609 assert( priority < out_queues.size() );
610
611 // * enqueue *
612 out_queues[priority].push(packet);
613
614 if ( ! sending )
615 {
616 restart_sending = true;
617 sending = true;
618 }
619 }
620
621 // if sending was stopped, we have to restart it here
622 if ( restart_sending )
623 {
624 send_next_package();
625 }
626}
627
628/* see header file for comments */
629template <class T>
630void StreamTransport<T>::StreamConnection::send_next_package()
631{
632 Packet packet;
633 bool found = false;
634
635 // I'm not sure if this can actually happen.. But let's be on the save side, here.
636 if ( ! this->valid )
637 {
638 this->sending = false;
639 return;
640 }
641
642 // find packet with highest priority [locked]
643 {
644 unique_lock lock(out_queues_lock);
645
646 for ( vector<OutQueue>::iterator it = out_queues.begin();
647 it != out_queues.end(); it++ )
648 {
649 if ( !it->empty() )
650 {
651 packet = it->front();
652 it->pop();
653 found = true;
654
655 break;
656 }
657 }
658
659 // no packets waiting --> stop sending
660 if ( ! found )
661 {
662 sending = false;
663 }
664 }
665
666 // * send *
667 if ( found )
668 {
669 reboost::shared_buffer_t header_buf(sizeof(header_t));
670 header_t* header = (header_t*)(header_buf.mutable_data());
671 header->length = packet.size();
672
673 packet.push_front(header_buf);
674
675 // "convert" message to asio buffer sequence
676 vector<boost::asio::const_buffer> send_sequence(packet.length());
677 for ( int i=0; i < packet.length(); i++ )
678 {
679 shared_buffer_t b = packet.at(i);
680 send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
681 }
682
683 // * async write *
684 boost::asio::async_write(
685 this->sock,
686 send_sequence,
687 boost::bind(
688 &StreamTransport<T>::StreamConnection::async_write_handler,
689 this->shared_from_this(),
690 packet, // makes sure our shared pointer lives long enough ;-)
691 boost::asio::placeholders::error,
692 boost::asio::placeholders::bytes_transferred)
693 );
694 }
695}
696/*********************
697 ** [ inner class ] **
698 *********************/
699
700
701
702// explicitly tell the compiler to create a »tcp« (and »rfcomm«) specialization
703// --> (needed since hpp and cpp are separated)
704template class StreamTransport<boost::asio::ip::tcp>;
705
706#ifdef HAVE_LIBBLUETOOTH
707 template class StreamTransport<rfcomm>;
708#endif /* HAVE_LIBBLUETOOTH */
709
710
711
712/////////////////////////////////////////////////////////////////////////////////////
713
714
715// XXX testing
716///**
717// * Conversion between ASIO Adresses and ARIBA adresses
718// */
719///* TCP */
720//template <>
721//inline typename tcp::endpoint convert_address<tcp>( const address_v* address )
722//{
723// tcpip_endpoint endpoint = *address;
724//
725// return typename tcp::endpoint(
726// endpoint.address().asio(), endpoint.port().value()
727// );
728//}
729//
730//template <>
731//inline EndpointPtr convert_address<tcp>(const typename tcp::endpoint& endpoint)
732//{
733// ip_address address;
734// address.asio(endpoint.address());
735// tcp_port_address port;
736// port.value(endpoint.port());
737//
738//// new tcpip_endpoint(address, port);
739// tcpip_endpoint xx;
740// address_vf yy;
741// address_v* zz = yy->clone();
742// address_v::shared_ptr endp(zz); // XXX
743//
744// return endp;
745//}
746//
747///* RFCOMM */
748//#ifdef HAVE_LIBBLUETOOTH
749// template <>
750// inline typename rfcomm::endpoint convert_address<rfcomm>( const address_v* address )
751// {
752// rfcomm_endpoint endpoint = *address;
753//
754// return rfcomm::endpoint(
755// endpoint.mac().bluetooth(), endpoint.channel().value()
756// );
757// }
758//
759// template <>
760// inline address_v::shared_ptr convert_address<rfcomm>(const typename rfcomm::endpoint& endpoint)
761// {
762// mac_address mac;
763// mac.bluetooth(endpoint.address());
764// rfcomm_channel_address channel;
765// channel.value(endpoint.channel());
766//
767// address_v::shared_ptr endp((ariba::addressing::address_v*) new rfcomm_endpoint(mac, channel));
768//
769// return endp;
770// }
771//#endif /* HAVE_LIBBLUETOOTH */
772
773
774
775/////////////////////////////////////////////////////////////////////////////////////
776
777
778/*
779 * Get Ethernet scope ids (for link-local)
780 */
781vector<uint64_t> get_interface_scope_ids()
782{
783 vector<uint64_t> ret;
784
785 struct ifaddrs* ifaceBuffer = NULL;
786 void* tmpAddrPtr = NULL;
787
788 int ok = getifaddrs( &ifaceBuffer );
789 if( ok != 0 ) return ret;
790
791 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
792
793 // ignore devices that are disabled or have no ip
794 if(i == NULL) continue;
795 struct sockaddr* addr = i->ifa_addr;
796 if (addr==NULL) continue;
797
798 // only use ethX and wlanX devices
799 string device = string(i->ifa_name);
800 if ( (device.find("eth") == string::npos) &&
801 (device.find("wlan") == string::npos) /* &&
802 (device.find("lo") == string::npos) XXX */ )
803 {
804 continue;
805 }
806
807 // only use interfaces with ipv6 link-local addresses
808 if (addr->sa_family == AF_INET6)
809 {
810 // convert address
811 // TODO should be possible without detour over strings
812 char straddr[INET6_ADDRSTRLEN];
813 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
814 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
815
816 address_v6 v6addr = address_v6::from_string(straddr);
817 if ( v6addr.is_link_local() )
818 {
819 // * append the scope_id to the return vector *
820 ret.push_back(if_nametoindex(i->ifa_name));
821 }
822
823 }
824 }
825
826 freeifaddrs(ifaceBuffer);
827
828 return ret;
829}
830
831
832}} // namespace ariba::transport
Note: See TracBrowser for help on using the repository browser.