source: source/ariba/utility/transport/StreamTransport/StreamTransport.cpp@ 12773

Last change on this file since 12773 was 12773, checked in by hock@…, 10 years ago

WARNING! This revision is not intended for productive use!

!! DEBUGGING ONLY !!

Extreme debugging of StreamTransport.
"Typo" in the locking-code, was very hard to track down!

This revision is stored since a lot dabug code was written and should not just be deleted.

File size: 22.0 KB
Line 
1#include "StreamTransport.hpp"
2
3// ariba
4#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
5
6// boost-adaption
7#include "ariba/utility/transport/rfcomm/bluetooth_rfcomm.hpp"
8
9// boost
10#include <boost/foreach.hpp>
11#include <boost/array.hpp>
12#include <boost/asio/ip/address.hpp>
13
14// interface discovery for link-local destinations (tcp-only)
15#include <ifaddrs.h>
16
17
18namespace ariba {
19namespace transport {
20
21//use_logging_cpp(StreamTransport<T>)
22
23#ifdef HAVE_LIBBLUETOOTH
24 using boost::asio::bluetooth::rfcomm;
25#endif /* HAVE_LIBBLUETOOTH */
26
27using namespace ariba::addressing2;
28using ariba::utility::LinkID;
29
30using boost::asio::ip::tcp;
31using boost::asio::ip::address_v6;
32using boost::shared_ptr;
33
34typedef boost::mutex::scoped_lock unique_lock;
35
36
37template <class T>
38StreamTransport<T>::StreamTransport( const typename T::endpoint& endp ) :
39 listener(NULL),
40 acceptor(u_io_service.get_asio_io_service(), endp)
41{
42}
43
44template <class T>
45StreamTransport<T>::~StreamTransport(){}
46
47template <class T>
48void StreamTransport<T>::start()
49{
50 // open server socket
51 accept();
52
53 u_io_service.start();
54}
55
56
57template <class T>
58void StreamTransport<T>::stop()
59{
60 acceptor.close();
61
62 u_io_service.stop();
63}
64
65
66/* see header file for comments */
67template <class T>
68void StreamTransport<T>::send(
69 const typename T::endpoint& dest_addr,
70 reboost::message_t message,
71 uint8_t priority)
72{
73 ConnPtr conn;
74 bool need_to_connect = false;
75
76 {
77 unique_lock lock(connections_lock);
78
79 // try to find existing connection to this endpoint
80 typename ConnectionMap::iterator it = connections.find(dest_addr);
81
82 // BRANCH: create new connection
83 if (it == connections.end())
84 {
85 ConnPtr tmp_ptr(
86 new StreamConnection(
87 u_io_service.get_asio_io_service(),
88 this->shared_from_this() )
89 );
90 conn = tmp_ptr;
91
92 // save partner endpoint
93 conn->partner_endpoint = dest_addr;
94
95 // Note: starting the send is the obligation of the connect_handler
96 // (avoids trying to send while not connected yet)
97 conn->sending = true;
98 need_to_connect = true;
99
100 typename ConnectionMap::value_type item(dest_addr, conn);
101 connections.insert(item);
102
103 }
104 // BRANCH: use existing connection
105 else
106 {
107 conn = it->second;
108 }
109 }
110
111
112 // * the actual send *
113 conn->enqueue_for_sending(message, priority);
114
115 // if new connection connect to the other party
116 if ( need_to_connect )
117 {
118 conn->sock.async_connect(
119 dest_addr,
120 boost::bind(
121 &StreamConnection::async_connect_handler,
122 conn,
123 boost::asio::placeholders::error));
124 }
125}
126
127
128// TODO is this a private function? ... and still needed?
129/* see header file for comments */
130template <class T>
131void StreamTransport<T>::send(
132 const EndpointPtr remote,
133 reboost::message_t message,
134 uint8_t priority)
135{
136 if ( remote->get_category() == endpoint_category::TCPIP )
137 {
138 const addressing2::tcpip_endpoint* endp =
139 static_cast<const addressing2::tcpip_endpoint*>(remote.get());
140
141 send(endp->to_asio(), message, priority);
142 }
143
144 // else
145 // TODO what now?
146}
147
148
149template <class T>
150void StreamTransport<T>::register_listener( transport_listener* listener )
151{
152 this->listener = listener;
153}
154
155
156// XXX DEPRECATED
157template <class T>
158void StreamTransport<T>::terminate( const EndpointPtr remote )
159{
160 if ( remote->get_category() == endpoint_category::TCPIP )
161 {
162 const addressing2::tcpip_endpoint* endp =
163 static_cast<const addressing2::tcpip_endpoint*>(remote.get());
164
165
166 this->terminate(endp->to_asio());
167 }
168}
169
170template <class T>
171void StreamTransport<T>::terminate( const typename T::endpoint& remote )
172{
173 ConnPtr conn;
174
175 // find and forget connection
176 {
177 unique_lock lock(connections_lock);
178
179 typename ConnectionMap::iterator it = connections.find(remote);
180 if (it == connections.end())
181 {
182 return;
183 }
184
185 conn = it->second;
186
187 // prevent upper layers from using this link
188 conn->valid = false;
189
190 connections.erase(it);
191 }
192
193 // XXX aktuell
194// cout << "/// MARIO: TCP/IP TERMINATE: " << conn->partner_endpoint << endl;
195
196 // notify higher layers
197 if ( listener )
198 {
199 listener->connection_terminated(conn);
200 }
201
202
203 // XXX debug aktuell
204// cout << "/// MARIO Open connections:" << endl;
205// for ( typename ConnectionMap::iterator it = connections.begin(); it != connections.end(); ++it )
206// {
207// cout << " CONNECTION: " << it->second->local_endpoint << " <---> " << it->second->partner_endpoint << endl;
208// cout << " used by: " << endl;
209//
210// int usecount = 0;
211// ConnPtr xx;
212// std::vector<LinkID*> links = it->second->get_communication_links();
213// for ( std::vector<LinkID*>::iterator it2 = links.begin(); it2 != links.end(); ++it2 )
214// {
215// cout << " - " << *it2 << endl;
216// usecount++;
217// }
218// if ( usecount == 0 )
219// {
220// cout << " ---> NOBODY !!" << endl;
221// }
222// }
223// cout << "/// -------" << endl;
224
225
226 // close connection
227 boost::system::error_code ec;
228 conn->sock.shutdown(T::socket::shutdown_both, ec);
229 conn->sock.close(ec);
230}
231
232
233/* private */
234template <class T>
235void StreamTransport<T>::accept()
236{
237 // create new connection object
238 ConnPtr conn(
239 new StreamConnection(
240 u_io_service.get_asio_io_service(),
241 this->shared_from_this()
242 )
243 );
244
245 // wait for incoming connection
246 acceptor.async_accept(
247 conn->sock,
248 boost::bind(&self::async_accept_handler,
249 this->shared_from_this(),
250 conn,
251 boost::asio::placeholders::error)
252 );
253}
254
255template <class T>
256void StreamTransport<T>::async_accept_handler(ConnPtr conn, const error_code& error)
257{
258 if ( ! error )
259 {
260 // save partner endpoint
261 conn->partner_endpoint = conn->sock.remote_endpoint();
262
263 {
264 unique_lock lock(connections_lock);
265
266 typename ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn);
267 connections.insert(item);
268 }
269
270 // read
271 conn->listen();
272 }
273
274 // accept further connections
275 accept();
276}
277
278
279
280/*------------------
281 | specializations |
282 ------------------*/
283/* TCP */
284template <>
285void StreamTransport<tcp>::send(
286 const addressing2::const_EndpointSetPtr endpoints,
287 reboost::message_t message,
288 uint8_t priority )
289{
290 // network interfaces scope_ids, for link-local connections (lazy initialization)
291 vector<uint64_t> scope_ids;
292
293 // send a message to each combination of address-address and port
294 BOOST_FOREACH( const TcpIP_EndpointPtr address, endpoints->get_tcpip_endpoints() )
295// vector<TcpIP_EndpointPtr> endpoint_vec = endpoints->get_tcpip_endpoints();
296// for ( vector<TcpIP_EndpointPtr>::iterator it = endpoint_vec.begin(); it != endpoint_vec.end(); ++it )
297 {
298 tcp::endpoint endp = address->to_asio();
299
300 // special treatment for link local addresses
301 // ---> send over all (suitable) interfaces
302 if ( endp.address().is_v6() )
303 {
304 boost::asio::ip::address_v6 v6_addr = endp.address().to_v6();
305
306 if ( v6_addr.is_link_local() )
307 {
308 // initialize scope_ids
309 if ( scope_ids.size() == 0 )
310 scope_ids = get_interface_scope_ids();
311
312 BOOST_FOREACH ( uint64_t id, scope_ids )
313 {
314 v6_addr.scope_id(id);
315 endp.address(v6_addr);
316
317// logging_debug("------> SEND TO (link-local): " << endp);
318 // * send *
319 send(endp, message, priority);
320 }
321 }
322
323 continue;
324 }
325
326 // * send *
327 send(endp, message, priority);
328 }
329}
330
331
332/* RFCOMM */
333#ifdef HAVE_LIBBLUETOOTH
334
335// TODO
336
337// template <>
338// void StreamTransport<rfcomm>::send(
339// const endpoint_set& endpoints,
340// reboost::message_t message,
341// uint8_t priority )
342// {
343// // send a message to each combination of address-address and port
344// BOOST_FOREACH( const mac_address mac, endpoints.bluetooth ) {
345// BOOST_FOREACH( const rfcomm_channel_address channel, endpoints.rfcomm ) {
346// rfcomm::endpoint endp(mac.bluetooth(), channel.value());
347//
348// // * send *
349// send(endp, message, priority);
350// }
351// }
352// }
353
354#endif /* HAVE_LIBBLUETOOTH */
355
356
357
358/*****************
359 ** inner class **
360 *****************/
361
362template <class T>
363StreamTransport<T>::StreamConnection::StreamConnection(boost::asio::io_service & io_service, StreamTransportPtr parent) :
364 sock(io_service),
365 valid(true),
366 parent(parent),
367 out_queues(8), //TODO How much priorities shall we have?
368 sending(false),
369 MAGIC_NUMBER(424242)
370{
371 header.length = 0;
372}
373
374/*-------------------------------------------
375 | implement transport_connection interface |
376 -------------------------------------------*/
377template <class T>
378bool StreamTransport<T>::StreamConnection::send(
379 reboost::message_t message,
380 uint8_t priority)
381{
382 if ( ! valid )
383 {
384 // XXX aktuell
385// cout << "/// MARIO: USED INVALID LINK: " << this->partner_endpoint << endl;
386
387 return false;
388 }
389
390 // * enqueue data *
391 enqueue_for_sending(message, priority);
392
393 return true;
394}
395
396
397template <class T>
398EndpointPtr StreamTransport<T>::StreamConnection::getLocalEndpoint()
399{
400 EndpointPtr ret(new addressing2::tcpip_endpoint(local_endpoint));
401
402 return ret;
403}
404
405
406template <class T>
407EndpointPtr StreamTransport<T>::StreamConnection::getRemoteEndpoint()
408{
409 EndpointPtr ret(new addressing2::tcpip_endpoint(partner_endpoint));
410
411 return ret;
412}
413
414template <class T>
415void StreamTransport<T>::StreamConnection::register_communication_link(LinkID* link)
416{
417 if ( ! link )
418 return;
419
420 // add link
421 communication_links.push_back(link);
422}
423
424template <class T>
425void StreamTransport<T>::StreamConnection::unregister_communication_link(LinkID* link)
426{
427 if ( ! link )
428 return;
429
430
431 // remove link
432 {
433 std::vector<LinkID*>::iterator it = communication_links.begin();
434
435 while ( it != communication_links.end() )
436 {
437 if ( (*it) == link )
438 {
439 it = communication_links.erase(it);
440 }
441 else
442 {
443 ++it;
444 }
445 }
446 }
447
448 // this connection is no longer used by any link
449 if ( communication_links.empty() )
450 {
451 //XXX
452// cout << "communication_links.empty() " << getLocalEndpoint()->to_string() << " - " << getRemoteEndpoint()->to_string() << endl;
453
454 // terminate connection
455 this->terminate(); // TODO aktuell
456
457 /*
458 * TODO racecondition:
459 * When receiving a link request, the connection could closed
460 * before the request is handled.
461 *
462 * ---> Maybe wait a timeout before actually terminate the connection.
463 * (e.g. record last-used time:
464 * if last used > timeout and communication_links.empty()
465 * then terminate the connection)
466 */
467 }
468}
469
470template <class T>
471std::vector<LinkID*> StreamTransport<T>::StreamConnection::get_communication_links()
472{
473 return communication_links;
474}
475
476
477template <class T>
478void StreamTransport<T>::StreamConnection::terminate()
479{
480 parent->terminate(partner_endpoint);
481}
482
483
484/*------------------------------
485 | things we defined ourselves |
486 ------------------------------*/
487template <class T>
488void StreamTransport<T>::StreamConnection::async_connect_handler(const error_code& error)
489{
490 if (error)
491 {
492 parent->terminate(partner_endpoint);
493
494 return;
495 }
496
497 // save local endpoint
498 local_endpoint = sock.local_endpoint();
499
500 // Note: sending has to be true at this point
501 send_next_package();
502
503 listen();
504}
505
506
507template <class T>
508void StreamTransport<T>::StreamConnection::listen()
509{
510 boost::asio::async_read(
511 this->sock,
512 boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)),
513 boost::bind(
514 &StreamTransport<T>::StreamConnection::async_read_header_handler,
515 this->shared_from_this(),
516 boost::asio::placeholders::error,
517 boost::asio::placeholders::bytes_transferred
518 )
519 );
520}
521
522
523template <class T>
524void StreamTransport<T>::StreamConnection::async_read_header_handler(const error_code& error, size_t bytes_transferred)
525{
526 if (error)
527 {
528 parent->terminate(partner_endpoint);
529
530 return;
531 }
532
533 // sanity checks
534 // NOTE: max size 8k (may be changed later..)
535 if ( header.length == 0 || header.length > 8192 )
536 {
537 parent->terminate(partner_endpoint);
538 }
539
540
541 // new buffer for the new packet
542 buffy = shared_buffer_t(header.length);
543
544 // * read data *
545 boost::asio::async_read(
546 this->sock,
547 boost::asio::buffer(buffy.mutable_data(), buffy.size()),
548 boost::bind(
549 &StreamTransport<T>::StreamConnection::async_read_data_handler,
550 this->shared_from_this(),
551 boost::asio::placeholders::error,
552 boost::asio::placeholders::bytes_transferred
553 )
554 );
555}
556
557template <class T>
558void StreamTransport<T>::StreamConnection::async_read_data_handler(
559 const error_code& error, size_t bytes_transferred)
560{
561 if (error)
562 {
563 parent->terminate(partner_endpoint);
564
565 return;
566 }
567
568 if ( parent->listener )
569 parent->listener->receive_message(this->shared_from_this(), buffy);
570
571 buffy = shared_buffer_t();
572 listen();
573}
574
575/* see header file for comments */
576template <class T>
577void StreamTransport<T>::StreamConnection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred)
578{
579 if ( error )
580 {
581 // remove this connection
582 parent->terminate(partner_endpoint);
583
584 return;
585 }
586
587 send_next_package();
588}
589
590
591
592template <class T>
593void StreamTransport<T>::StreamConnection::enqueue_for_sending(Packet packet, uint8_t priority)
594{
595 bool restart_sending = false;
596
597 // FIXME Mario DEBUGGING -- copy !!
598 reboost::shared_buffer_t buff = packet.linearize();
599 reboost::message_t msg;
600 msg.push_back(buff);
601 assert ( msg.MAGIC_NUMBER == 421337 );
602 // [ DEBUGGING ]
603
604
605 // enqueue packet [locked]
606 {
607 boost::mutex::scoped_lock lock(out_queues_lock);
608
609 int debuggingA = out_queues[priority].size();
610// assert ( debuggingA < 1000 ); // XXX
611
612 assert ( this->valid );
613
614 assert( priority < out_queues.size() ); // NOTE: actual assert, not in context with the extended debugging..
615// out_queues[priority].push(packet); // FIXME Mario
616 out_queues[priority].push(msg); // FIXME Mario
617
618 // XXX
619 int debuggingB = out_queues[priority].size();
620 int magic = out_queues[priority].back().MAGIC_NUMBER;
621 assert ( debuggingB == debuggingA + 1 );
622 assert ( magic == 421337 );
623
624 if ( ! sending )
625 {
626 restart_sending = true;
627 sending = true;
628 }
629 }
630
631 // if sending was stopped, we have to restart it here
632 if ( restart_sending )
633 {
634 send_next_package();
635 }
636}
637
638/* see header file for comments */
639template <class T>
640void StreamTransport<T>::StreamConnection::send_next_package()
641{
642 Packet packet;
643 bool found = false;
644
645 // XXX Mario: Debugging
646 if ( ! this->valid )
647 {
648 this->sending = false;
649 cout << "/// StreamConnection::send_next_package() on INVALID STREAM" << endl;
650 return;
651 }
652
653 // find packet with highest priority [locked]
654 {
655 boost::mutex::scoped_lock lock(out_queues_lock);
656
657 assert ( this->valid ); // XXX TODO ggf. in if (valid) Àndern...
658 assert ( this->sending );
659 assert ( this->MAGIC_NUMBER == 424242 );
660 assert ( this->out_queues.size() == 8 );
661
662 for ( vector<OutQueue>::iterator it = out_queues.begin();
663 it != out_queues.end(); it++ )
664 {
665 int debugging = it->size(); // XXX debugging
666// assert ( debugging < 1000 );
667
668 if ( !it->empty() )
669 {
670 packet = it->front();
671 it->pop();
672 found = true;
673
674 break;
675 }
676 }
677
678 // no packets waiting --> stop sending
679 if ( ! found )
680 {
681 sending = false;
682 }
683 }
684
685 // * send *
686 if ( found )
687 {
688 reboost::shared_buffer_t header_buf(sizeof(header_t));
689 header_t* header = (header_t*)(header_buf.mutable_data());
690 header->length = packet.size();
691
692 packet.push_front(header_buf);
693
694 // "convert" message to asio buffer sequence
695 vector<boost::asio::const_buffer> send_sequence(packet.length());
696 for ( int i=0; i < packet.length(); i++ )
697 {
698 shared_buffer_t b = packet.at(i);
699 send_sequence.push_back(boost::asio::buffer(b.data(), b.size()));
700 }
701
702 // * async write *
703 boost::asio::async_write(
704 this->sock,
705 send_sequence,
706 boost::bind(
707 &StreamTransport<T>::StreamConnection::async_write_handler,
708 this->shared_from_this(),
709 packet, // makes sure our shared pointer lives long enough ;-)
710 boost::asio::placeholders::error,
711 boost::asio::placeholders::bytes_transferred)
712 );
713 }
714}
715/*********************
716 ** [ inner class ] **
717 *********************/
718
719
720
721// explicitly tell the compiler to create a »tcp« (and »rfcomm«) specialization
722// --> (needed since hpp and cpp are separated)
723template class StreamTransport<boost::asio::ip::tcp>;
724
725#ifdef HAVE_LIBBLUETOOTH
726 template class StreamTransport<rfcomm>;
727#endif /* HAVE_LIBBLUETOOTH */
728
729
730
731/////////////////////////////////////////////////////////////////////////////////////
732
733
734// XXX testing
735///**
736// * Conversion between ASIO Adresses and ARIBA adresses
737// */
738///* TCP */
739//template <>
740//inline typename tcp::endpoint convert_address<tcp>( const address_v* address )
741//{
742// tcpip_endpoint endpoint = *address;
743//
744// return typename tcp::endpoint(
745// endpoint.address().asio(), endpoint.port().value()
746// );
747//}
748//
749//template <>
750//inline EndpointPtr convert_address<tcp>(const typename tcp::endpoint& endpoint)
751//{
752// ip_address address;
753// address.asio(endpoint.address());
754// tcp_port_address port;
755// port.value(endpoint.port());
756//
757//// new tcpip_endpoint(address, port);
758// tcpip_endpoint xx;
759// address_vf yy;
760// address_v* zz = yy->clone();
761// address_v::shared_ptr endp(zz); // XXX
762//
763// return endp;
764//}
765//
766///* RFCOMM */
767//#ifdef HAVE_LIBBLUETOOTH
768// template <>
769// inline typename rfcomm::endpoint convert_address<rfcomm>( const address_v* address )
770// {
771// rfcomm_endpoint endpoint = *address;
772//
773// return rfcomm::endpoint(
774// endpoint.mac().bluetooth(), endpoint.channel().value()
775// );
776// }
777//
778// template <>
779// inline address_v::shared_ptr convert_address<rfcomm>(const typename rfcomm::endpoint& endpoint)
780// {
781// mac_address mac;
782// mac.bluetooth(endpoint.address());
783// rfcomm_channel_address channel;
784// channel.value(endpoint.channel());
785//
786// address_v::shared_ptr endp((ariba::addressing::address_v*) new rfcomm_endpoint(mac, channel));
787//
788// return endp;
789// }
790//#endif /* HAVE_LIBBLUETOOTH */
791
792
793
794/////////////////////////////////////////////////////////////////////////////////////
795
796
797/*
798 * Get Ethernet scope ids (for link-local)
799 */
800vector<uint64_t> get_interface_scope_ids()
801{
802 vector<uint64_t> ret;
803
804 struct ifaddrs* ifaceBuffer = NULL;
805 void* tmpAddrPtr = NULL;
806
807 int ok = getifaddrs( &ifaceBuffer );
808 if( ok != 0 ) return ret;
809
810 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
811
812 // ignore devices that are disabled or have no ip
813 if(i == NULL) continue;
814 struct sockaddr* addr = i->ifa_addr;
815 if (addr==NULL) continue;
816
817 // only use ethX and wlanX devices
818 string device = string(i->ifa_name);
819 if ( (device.find("eth") == string::npos) &&
820 (device.find("wlan") == string::npos) /* &&
821 (device.find("lo") == string::npos) XXX */ )
822 {
823 continue;
824 }
825
826 // only use interfaces with ipv6 link-local addresses
827 if (addr->sa_family == AF_INET6)
828 {
829 // convert address
830 // TODO should be possible without detour over strings
831 char straddr[INET6_ADDRSTRLEN];
832 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
833 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
834
835 address_v6 v6addr = address_v6::from_string(straddr);
836 if ( v6addr.is_link_local() )
837 {
838 // * append the scope_id to the return vector *
839 ret.push_back(if_nametoindex(i->ifa_name));
840 }
841
842 }
843 }
844
845 freeifaddrs(ifaceBuffer);
846
847 return ret;
848}
849
850
851}} // namespace ariba::transport
Note: See TracBrowser for help on using the repository browser.