Changeset 10653 for source/ariba/utility/transport
- Timestamp:
- Jul 25, 2012, 11:41:36 AM (12 years ago)
- Location:
- source/ariba/utility/transport
- Files:
-
- 17 added
- 7 deleted
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/utility/transport
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/transport/tcpip/tcpip.cpp
r10075 r10653 1 1 #include "tcpip.hpp" 2 2 3 #define _NO_LOGGING 4 5 // std includes 6 #include <unistd.h> 7 #include <iostream> 8 #include <string> 9 #include <sstream> 10 #include <boost/foreach.hpp> 11 12 // protlib includes 13 #include "protlib/network_message.h" 14 #include "protlib/tp_over_tcp.h" 15 #include "protlib/tperror.h" 16 #include "protlib/logfile.h" 17 #include "protlib/queuemanager.h" 18 #include "protlib/threadsafe_db.h" 19 #include "protlib/setuid.h" 20 21 // protlib namespaces 22 using namespace protlib; 23 using namespace protlib::log; 24 25 logfile commonlog; 26 protlib::log::logfile& protlib::log::DefaultLog(commonlog); 3 #include <boost/array.hpp> 4 5 // interface discovery for link-local destinations 6 #include <ifaddrs.h> 27 7 28 8 namespace ariba { 29 9 namespace transport { 30 10 11 use_logging_cpp(tcpip) 12 31 13 using namespace ariba::addressing; 32 14 33 34 tcpip_endpoint convert( const appladdress* addr ) { 35 const char* ip_str = addr->get_ip_str(); 36 tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() ); 37 return endpoint; 38 } 39 40 appladdress convert( const tcpip_endpoint& endpoint ) { 41 tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint); 42 appladdress 43 peer(e->address().to_string().c_str(), "tcp", e->port().asio() ); 44 // cout << endpoint.to_string() << " to " << peer.get_ip_str() << ":" << peer.get_port() << endl; 45 return peer; 46 } 47 48 tcpip::tcpip( uint16_t port ) : 49 done ( false ), 50 running ( false ), 51 port( port ), 52 tpreceivethread ( NULL ), 53 tpthread ( NULL ), 54 listener ( NULL ) { 55 } 56 57 tcpip::~tcpip() { 58 if (running) stop(); 59 } 60 61 bool get_message_length( NetMsg& m, uint32& clen_bytes ) { 62 clen_bytes = m.decode32(); 63 m.set_pos_r(-4); 64 return true; 65 } 66 67 void tcpip::start() { 68 done = false; 69 running = false; 70 71 // initalize netdb and setuid 72 protlib::tsdb::init(); 73 protlib::setuid::init(); 74 75 // set tcp parameters 76 port_t port = this->port; // port 77 TPoverTCPParam tppar(4, get_message_length, port); 78 79 // create receiver thread 80 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true); 81 QueueManager::instance()->register_queue(tpchecker_fq, 82 message::qaddr_signaling); 83 84 // start thread 85 pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this ); 86 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar ); 87 tpthread->start_processing(); 88 } 89 90 void tcpip::stop() { 91 // stop receiver thread 92 done = true; 93 94 // stop TPoverTCP 95 tpthread->stop_processing(); 96 tpthread->abort_processing(true); 97 tpthread->wait_until_stopped(); 98 99 // unregister TPoverTCP 100 delete QueueManager::instance()->get_queue( message::qaddr_signaling ); 101 QueueManager::instance()->unregister_queue( message::qaddr_signaling ); 102 103 // destroy QueueManager 104 QueueManager::clear(); 105 106 // de-initalize netdb and setuid 107 protlib::setuid::end(); 108 protlib::tsdb::end(); 109 110 // wait for thread to finish and delete 111 pthread_join(tpreceivethread, NULL); 112 } 113 114 void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) { 115 116 // prepare netmsg with length and and port 117 NetMsg* datamsg = new NetMsg(size + 6); 118 datamsg->encode32( size + 2, true ); 119 datamsg->encode16( this->port,true ); 120 121 for (size_t i=0; i<size; i++) 122 datamsg->encode8( data[i],true ); 123 124 // send message 125 tcpip_endpoint endpoint = *remote; 126 appladdress peer = convert(endpoint); 127 128 // add to output queue 129 tpthread->get_thread_object()->send( datamsg, peer, false ); 130 } 131 132 void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) { 133 // send a message to each combination of ip-address and port 134 BOOST_FOREACH( const ip_address ip, endpoints.ip ) { 135 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) { 136 tcpip_endpoint endpoint(ip,port); 137 address_vf vf = endpoint; 138 send(vf,data,size); 139 } 140 } 141 } 142 143 void tcpip::terminate( const address_v* remote) { 144 tcpip_endpoint endpoint = *remote; 145 appladdress peer = convert(endpoint); 146 peer.convert_to_ipv6(); 147 tpthread->get_thread_object()->terminate( peer ); 148 } 149 150 void tcpip::register_listener( transport_listener* listener ) { 151 this->listener = listener; 152 } 153 154 void* tcpip::receiverThread( void* ptp ) { 155 // get reference to transport object 156 tcpip& tp = *((tcpip*)ptp); 157 158 // get queue 159 FastQueue* fq = 160 QueueManager::instance()->get_queue(message::qaddr_signaling); 161 162 // main processing loop 163 tp.running = true; 164 while (!tp.done) { 165 166 // wait for new message to approach 167 message* msg = fq->dequeue_timedwait(300); 168 169 // message has arrived? no-> continue 170 if (!msg) continue; 171 172 // handle transport message 173 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg); 174 if (!tpmsg) { 175 delete msg; 176 continue; 177 } 178 179 // get address & message 180 const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() ); 181 const appladdress* local_peer = static_cast<const appladdress*>( tpmsg->get_ownaddress() ); 182 NetMsg* datamsg = tpmsg->get_message(); 183 184 // not a data message? -> continue! 185 if (!datamsg) { 186 delete tpmsg; 187 continue; 188 } 189 190 // get length and remote endpoint port 191 datamsg->set_pos(0); 192 uint32_t message_size = datamsg->decode32(true)-2; 193 //uint16_t remote_port = datamsg->decode16(true); 194 195 // inform listener 196 if (tp.listener != NULL) { 197 tcpip_endpoint remote = convert(remote_peer); 198 tcpip_endpoint local = convert(local_peer); 199 tp.listener->receive_message( 200 &tp, local, remote, datamsg->get_buffer()+6, message_size ); 201 } 202 203 tpmsg->set_message(NULL); 204 delete datamsg; 205 delete tpmsg; 206 } 207 // clean queue & stop 208 fq->cleanup(); 209 tp.running = false; 210 return NULL; 15 typedef boost::mutex::scoped_lock unique_lock; 16 17 tcpip::tcpip( const tcp::endpoint& endp ) : 18 listener(NULL), 19 acceptor(u_io_service.get_asio_io_service(), endp) 20 { 21 } 22 23 tcpip::~tcpip(){} 24 25 void tcpip::start() 26 { 27 // open server socket 28 accept(); 29 30 u_io_service.start(); 31 } 32 33 34 void tcpip::stop() 35 { 36 acceptor.close(); 37 38 u_io_service.stop(); 39 } 40 41 42 /* see header file for comments */ 43 void tcpip::send( 44 const tcp::endpoint& dest_addr, 45 reboost::message_t message, 46 uint8_t priority) 47 { 48 ConnPtr conn; 49 bool need_to_connect = false; 50 51 { 52 unique_lock lock(connections_lock); 53 54 ConnectionMap::iterator it = connections.find(dest_addr); 55 if (it == connections.end()) 56 { 57 ConnPtr tmp_ptr( 58 new tcpip_connection( 59 u_io_service.get_asio_io_service(), 60 shared_from_this() ) 61 ); 62 conn = tmp_ptr; 63 64 conn->partner = dest_addr; 65 conn->remote = convert_address(dest_addr); 66 67 // Note: starting the send is the obligation of the connect_handler 68 // (avoids trying to send while not connected yet) 69 conn->sending = true; 70 need_to_connect = true; 71 72 ConnectionMap::value_type item(dest_addr, conn); 73 connections.insert(item); 74 75 } else { 76 conn = it->second; 77 } 78 } 79 80 81 // * the actual send * 82 conn->enqueue_for_sending(message, priority); 83 84 // if new connection connect to the other party 85 if ( need_to_connect ) 86 { 87 conn->sock.async_connect( 88 dest_addr, 89 boost::bind( 90 &tcpip_connection::async_connect_handler, 91 conn, 92 boost::asio::placeholders::error)); 93 } 94 } 95 96 97 /* see header file for comments */ 98 void tcpip::send( 99 const address_v* remote, 100 reboost::message_t message, 101 uint8_t priority) 102 { 103 send(convert_address(remote), message, priority); 104 } 105 106 107 /* see header file for comments */ 108 void tcpip::send( 109 const endpoint_set& endpoints, 110 reboost::message_t message, 111 uint8_t priority ) 112 { 113 // network interfaces scope_ids, for link-local connections (lazy initialization) 114 vector<uint64_t> scope_ids; 115 116 // send a message to each combination of address-address and port 117 BOOST_FOREACH( const ip_address address, endpoints.ip ) { 118 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) { 119 tcp::endpoint endp(address.asio(), port.asio()); 120 121 // special treatment for link local addresses 122 // ---> send over all (suitable) interfaces 123 if ( endp.address().is_v6() ) 124 { 125 boost::asio::ip::address_v6 v6_addr = endp.address().to_v6(); 126 127 if ( v6_addr.is_link_local() ) 128 { 129 // initialize scope_ids 130 if ( scope_ids.size() == 0 ) 131 scope_ids = get_interface_scope_ids(); 132 133 BOOST_FOREACH ( uint64_t id, scope_ids ) 134 { 135 v6_addr.scope_id(id); 136 endp.address(v6_addr); 137 138 logging_debug("------> SEND TO (link-local): " << endp); 139 // * send * 140 send(endp, message, priority); 141 } 142 } 143 144 continue; 145 } 146 147 // * send * 148 send(endp, message, priority); 149 } 150 } 151 } 152 153 154 void tcpip::register_listener( transport_listener* listener ) 155 { 156 this->listener = listener; 157 } 158 159 160 void tcpip::terminate( const address_v* remote ) 161 { 162 terminate(convert_address(remote)); 163 } 164 165 void tcpip::terminate( const tcp::endpoint& remote ) 166 { 167 ConnPtr conn; 168 169 // find and forget connection 170 { 171 unique_lock lock(connections_lock); 172 173 ConnectionMap::iterator it = connections.find(remote); 174 if (it == connections.end()) 175 { 176 return; 177 } 178 179 conn = it->second; 180 181 connections.erase(it); 182 } 183 184 // close connection 185 boost::system::error_code ec; 186 conn->sock.shutdown(tcp::socket::shutdown_both, ec); 187 conn->sock.close(ec); 188 } 189 190 191 /* private */ 192 void tcpip::accept() 193 { 194 // create new connection object 195 ConnPtr conn( 196 new tcpip_connection( 197 u_io_service.get_asio_io_service(), 198 shared_from_this() 199 ) 200 ); 201 202 // wait for incoming connection 203 acceptor.async_accept( 204 conn->sock, 205 boost::bind(&self::async_accept_handler, 206 this->shared_from_this(), 207 conn, 208 boost::asio::placeholders::error) 209 ); 210 } 211 212 void tcpip::async_accept_handler(ConnPtr conn, const error_code& error) 213 { 214 if ( ! error ) 215 { 216 conn->partner = conn->sock.remote_endpoint(); 217 conn->remote = convert_address(conn->partner); 218 conn->local = convert_address(conn->sock.local_endpoint()); 219 220 { 221 unique_lock lock(connections_lock); 222 223 ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn); 224 connections.insert(item); 225 } 226 227 // read 228 conn->listen(); 229 } 230 231 // accept further connections 232 accept(); 233 } 234 235 inline tcp::endpoint tcpip::convert_address( const address_v* address ) 236 { 237 tcpip_endpoint endpoint = *address; 238 239 return tcp::endpoint( 240 endpoint.address().asio(), endpoint.port().value() 241 ); 242 } 243 244 245 inline tcpip_endpoint tcpip::convert_address(const tcp::endpoint& endpoint) 246 { 247 ip_address address; 248 address.asio(endpoint.address()); 249 tcp_port_address port; 250 port.value(endpoint.port()); 251 return tcpip_endpoint(address, port); 252 } 253 254 255 vector<uint64_t> tcpip::get_interface_scope_ids() 256 { 257 vector<uint64_t> ret; 258 259 struct ifaddrs* ifaceBuffer = NULL; 260 void* tmpAddrPtr = NULL; 261 262 int ok = getifaddrs( &ifaceBuffer ); 263 if( ok != 0 ) return ret; 264 265 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) { 266 267 // ignore devices that are disabled or have no ip 268 if(i == NULL) continue; 269 struct sockaddr* addr = i->ifa_addr; 270 if (addr==NULL) continue; 271 272 // only use ethX and wlanX devices 273 string device = string(i->ifa_name); 274 if ( (device.find("eth") == string::npos) && 275 (device.find("wlan") == string::npos) /* && 276 (device.find("lo") == string::npos) XXX */ ) 277 { 278 continue; 279 } 280 281 // only use interfaces with ipv6 link-local addresses 282 if (addr->sa_family == AF_INET6) 283 { 284 // convert address 285 // TODO should be possible without detour over strings 286 char straddr[INET6_ADDRSTRLEN]; 287 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr; 288 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 289 290 address_v6 v6addr = address_v6::from_string(straddr); 291 if ( v6addr.is_link_local() ) 292 { 293 // * append the scope_id to the return vector * 294 ret.push_back(if_nametoindex(i->ifa_name)); 295 } 296 297 } 298 } 299 300 freeifaddrs(ifaceBuffer); 301 302 return ret; 303 } 304 305 306 /***************** 307 ** inner class ** 308 *****************/ 309 310 tcpip::tcpip_connection::tcpip_connection(boost::asio::io_service & io_service, TcpIpPtr parent) : 311 sock(io_service), 312 valid(true), 313 parent(parent), 314 out_queues(8), //TODO How much priorities shall we have? 315 sending(false) 316 { 317 header.length = 0; 318 header.prot = 0; 319 } 320 321 /*------------------------------------------- 322 | implement transport_connection interface | 323 -------------------------------------------*/ 324 void tcpip::tcpip_connection::send( 325 reboost::message_t message, 326 uint8_t priority) 327 { 328 enqueue_for_sending(message, priority); 329 } 330 331 332 address_vf tcpip::tcpip_connection::getLocalEndpoint() 333 { 334 return local; 335 } 336 337 338 address_vf tcpip::tcpip_connection::getRemoteEndpoint() 339 { 340 return remote; 341 } 342 343 344 void tcpip::tcpip_connection::terminate() 345 { 346 parent->terminate(partner); 347 } 348 349 350 /*------------------------------ 351 | things we defined ourselves | 352 ------------------------------*/ 353 void tcpip::tcpip_connection::async_connect_handler(const error_code& error) 354 { 355 if (error) 356 { 357 parent->terminate(partner); 358 359 return; 360 } 361 362 // save address in ariba format 363 local = parent->convert_address(sock.local_endpoint()); 364 365 // Note: sending has to be true at this point 366 send_next_package(); 367 368 listen(); 369 } 370 371 372 void tcpip::tcpip_connection::listen() 373 { 374 boost::asio::async_read( 375 this->sock, 376 boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)), 377 boost::bind( 378 &tcpip::tcpip_connection::async_read_header_handler, 379 this->shared_from_this(), 380 boost::asio::placeholders::error, 381 boost::asio::placeholders::bytes_transferred 382 ) 383 ); 384 } 385 386 387 void tcpip::tcpip_connection::async_read_header_handler(const error_code& error, size_t bytes_transferred) 388 { 389 if (error) 390 { 391 parent->terminate(partner); 392 393 return; 394 } 395 396 // convert byte order 397 header.length = ntohl(header.length); 398 header.length -= 2; // XXX protlib 399 400 assert(header.length > 0); 401 402 // new buffer for the new packet 403 buffy = shared_buffer_t(header.length); 404 405 // * read data * 406 boost::asio::async_read( 407 this->sock, 408 boost::asio::buffer(buffy.mutable_data(), buffy.size()), 409 boost::bind( 410 &tcpip::tcpip_connection::async_read_data_handler, 411 this->shared_from_this(), 412 boost::asio::placeholders::error, 413 boost::asio::placeholders::bytes_transferred 414 ) 415 ); 416 } 417 418 void tcpip::tcpip_connection::async_read_data_handler( 419 const error_code& error, size_t bytes_transferred) 420 { 421 if (error) 422 { 423 parent->terminate(partner); 424 425 return; 426 } 427 428 message_t msg; 429 msg.push_back(buffy); 430 buffy = shared_buffer_t(); 431 432 if ( parent->listener ) 433 parent->listener->receive_message(shared_from_this(), msg); 434 435 listen(); 436 } 437 438 /* see header file for comments */ 439 void tcpip::tcpip_connection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred) 440 { 441 if ( error ) 442 { 443 // remove this connection 444 parent->terminate(partner); 445 446 return; 447 } 448 449 send_next_package(); 450 } 451 452 453 454 void tcpip::tcpip_connection::enqueue_for_sending(Packet packet, uint8_t priority) 455 { 456 bool restart_sending = false; 457 458 // enqueue packet [locked] 459 { 460 unique_lock(out_queues_lock); 461 462 assert( priority < out_queues.size() ); 463 out_queues[priority].push(packet); 464 465 if ( ! sending ) 466 { 467 restart_sending = true; 468 sending = true; 469 } 470 } 471 472 // if sending was stopped, we have to restart it here 473 if ( restart_sending ) 474 { 475 send_next_package(); 476 } 477 } 478 479 /* see header file for comments */ 480 void tcpip::tcpip_connection::send_next_package() 481 { 482 Packet packet; 483 bool found = false; 484 485 // find packet with highest priority [locked] 486 { 487 unique_lock(out_queues_lock); 488 489 for ( vector<OutQueue>::iterator it = out_queues.begin(); 490 it != out_queues.end(); it++ ) 491 { 492 if ( !it->empty() ) 493 { 494 packet = it->front(); 495 it->pop(); 496 found = true; 497 498 break; 499 } 500 } 501 502 // no packets waiting --> stop sending 503 if ( ! found ) 504 { 505 sending = false; 506 } 507 } 508 509 // * send * 510 if ( found ) 511 { 512 reboost::shared_buffer_t header_buf(sizeof(header_t)); 513 header_t* header = (header_t*)(header_buf.mutable_data()); 514 header->length = htonl(packet.size()+2); // XXX protlib 515 516 packet.push_front(header_buf); 517 518 // "convert" message to asio buffer sequence 519 vector<boost::asio::const_buffer> send_sequence(packet.length()); 520 for ( int i=0; i < packet.length(); i++ ) 521 { 522 shared_buffer_t b = packet.at(i); 523 send_sequence.push_back(boost::asio::buffer(b.data(), b.size())); 524 } 525 526 // * async write * 527 boost::asio::async_write( 528 this->sock, 529 send_sequence, 530 boost::bind( 531 &tcpip::tcpip_connection::async_write_handler, 532 this->shared_from_this(), 533 packet, // makes sure our shared pointer lives long enough ;-) 534 boost::asio::placeholders::error, 535 boost::asio::placeholders::bytes_transferred) 536 ); 537 } 211 538 } 212 539 -
source/ariba/utility/transport/tcpip/tcpip.hpp
r5993 r10653 3 3 4 4 #include "ariba/utility/transport/transport.hpp" 5 #include <pthread.h>6 7 // forward declaration 8 namespace protlib { 9 template<class X, class Y>10 class ThreadStarter; 11 class TPoverTCP; 12 class TPoverTCPParam; 13 } 5 #include "ariba/utility/transport/asio/unique_io_service.h" 6 #include "ariba/utility/transport/transport_connection.hpp" 7 #include "ariba/utility/addressing/tcpip_endpoint.hpp" 8 #include <boost/asio.hpp> 9 #include <boost/shared_ptr.hpp> 10 #include <boost/enable_shared_from_this.hpp> 11 #include <queue> 12 #include "ariba/utility/transport/messages/buffers.hpp" 13 #include "ariba/utility/logging/Logging.h" 14 14 15 15 namespace ariba { 16 16 namespace transport { 17 17 18 using namespace protlib; 18 using namespace std; 19 using ariba::transport::detail::unique_io_service; 20 using ariba::addressing::tcpip_endpoint; 21 using boost::asio::ip::tcp; 22 using boost::asio::ip::address_v6; 23 using boost::system::error_code; 24 using reboost::shared_buffer_t; 25 using reboost::message_t; 19 26 20 /** 21 * TODO: Doc 22 * 23 * @author Sebastian Mies <mies@tm.uka.de> 24 */ 25 class tcpip : public transport_protocol { 27 class tcpip; 28 typedef boost::shared_ptr<tcpip> TcpIpPtr; 29 30 class tcpip : 31 public transport_protocol, 32 public boost::enable_shared_from_this<tcpip> 33 { 34 typedef tcpip self; 35 use_logging_h(tcpip) 36 37 private: 38 class tcpip_connection : 39 public transport_connection, 40 public boost::enable_shared_from_this<tcpip_connection> 41 { 42 public: 43 typedef reboost::message_t Packet; 44 typedef std::queue<Packet> OutQueue; 45 46 struct header_t 47 { 48 uint32_t length; 49 uint16_t prot; // XXX protlib 50 } __attribute__((packed)); 51 52 tcpip_connection(boost::asio::io_service& io_service, TcpIpPtr parent); 53 54 /// Inherited from transport_connection 55 virtual void send(reboost::message_t message, uint8_t priority = 0); 56 virtual address_vf getLocalEndpoint(); 57 virtual address_vf getRemoteEndpoint(); 58 virtual void terminate(); 59 60 void listen(); 61 62 void async_connect_handler(const error_code& error); 63 64 void async_read_header_handler(const error_code& error, size_t bytes_transferred); 65 void async_read_data_handler(const error_code& error, size_t bytes_transferred); 66 67 /* 68 * is called from asio when write operation "returns", 69 * calls private function `send_next_package()` 70 */ 71 void async_write_handler( 72 reboost::shared_buffer_t packet, 73 const error_code& error, 74 size_t bytes_transferred); 75 76 77 void enqueue_for_sending(Packet packet, uint8_t priority); 78 79 private: 80 /* 81 * is called from `send` or `async_write_handler` to begin/keep sending 82 * sends the next message with the highest priority in this connection 83 */ 84 void send_next_package(); 85 86 87 public: 88 tcp::socket sock; 89 bool valid; 90 TcpIpPtr parent; 91 92 tcp::endpoint partner; 93 tcpip_endpoint remote; 94 tcpip_endpoint local; 95 96 vector<OutQueue> out_queues; // to be locked with out_queues_lock 97 boost::mutex out_queues_lock; 98 99 bool sending; // to be locked with out_queues_lock 100 101 header_t header; 102 shared_buffer_t buffy; 103 }; 104 typedef boost::shared_ptr<tcpip_connection> ConnPtr; 105 typedef std::map<tcp::endpoint, ConnPtr> ConnectionMap; 106 26 107 public: 27 tcpip( uint16_t port);108 tcpip( const tcp::endpoint& endp ); 28 109 virtual ~tcpip(); 29 110 virtual void start(); 30 111 virtual void stop(); 31 virtual void send( const address_v* remote, const uint8_t* data, size_t size ); 32 virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size ); 112 113 /** 114 * enqueues message for sending 115 * create new connection if necessary 116 * starts sending mechanism (if not already running) 117 */ 118 void send( 119 const tcp::endpoint&, 120 reboost::message_t message, 121 uint8_t priority = 0 ); 122 123 /** 124 * Converts address_v to tcp::endpoint and calls the real send() function 125 */ 126 virtual void send( 127 const address_v* remote, 128 reboost::message_t message, 129 uint8_t priority = 0 ); 130 131 /** 132 * calls send for each destination endpoint in `endpoint_set& endpoints` 133 */ 134 virtual void send( 135 const endpoint_set& endpoints, 136 reboost::message_t message, 137 uint8_t priority = 0 ); 138 33 139 virtual void terminate( const address_v* remote ); 140 virtual void terminate( const tcp::endpoint& remote ); 34 141 virtual void register_listener( transport_listener* listener ); 35 142 143 144 /** 145 * returns a vector of (interesting) network interfaces 146 * 147 * [NOTE: The current implementation returns the scope_ids of 148 * all ethX and wlanX network interfaces, to be used for 149 * connections to link-local ipv6 addresses.] 150 * 151 * TODO move to ariba/communication/networkinfo/AddressDiscovery ?? 152 * 153 */ 154 static vector<uint64_t> get_interface_scope_ids(); 155 36 156 private: 37 volatile bool done, running; 38 uint16_t port; 39 pthread_t tpreceivethread; 40 ThreadStarter<TPoverTCP, TPoverTCPParam>* tpthread; 41 static void* receiverThread( void* ptp ); 157 void accept(); 158 void async_accept_handler(ConnPtr conn, const error_code& error); 159 tcp::endpoint convert_address(const address_v* endpoint); 160 tcpip_endpoint convert_address(const tcp::endpoint& endpoint); 161 162 private: 42 163 transport_listener* listener; 164 unique_io_service u_io_service; 165 tcp::acceptor acceptor; 166 167 ConnectionMap connections; 168 boost::mutex connections_lock; 43 169 }; 44 170 -
source/ariba/utility/transport/transport.hpp
r5284 r10653 8 8 // transport protocol implementations 9 9 #include "tcpip/tcpip.hpp" 10 #include "rfcomm/rfcomm .hpp"10 #include "rfcomm/rfcomm_transport.hpp" 11 11 12 12 // common transport peer using all known protocols -
source/ariba/utility/transport/transport_listener.hpp
r5993 r10653 5 5 6 6 #include "ariba/utility/addressing/addressing.hpp" 7 #include "ariba/utility/transport/messages/buffers.hpp" 8 #include "ariba/utility/transport/transport_connection.hpp" 7 9 8 10 // namespace ariba::transport … … 11 13 12 14 using namespace ariba::addressing; 13 14 class transport_protocol;15 15 16 16 /** … … 21 21 class transport_listener { 22 22 public: 23 /// Allow deleting implementing classes by pointer 24 virtual ~transport_listener() {} 25 23 26 /// called when a message is received 24 27 virtual void receive_message( 25 transport_protocol* transport, 26 const address_vf local, const address_vf remote, 27 const uint8_t* data, size_t size 28 transport_connection::sptr connection, 29 reboost::message_t msg 28 30 ) { 29 31 std::cout << "transport_listener: not implemented" << std::endl; -
source/ariba/utility/transport/transport_peer.cpp
r7834 r10653 3 3 #include "transport_peer.hpp" 4 4 #include "transport.hpp" 5 #include "ariba/utility/logging/Logging.h" 6 #include <boost/asio/ip/tcp.hpp> 7 #include <boost/asio/error.hpp> 8 #include <boost/foreach.hpp> 9 10 #ifdef ECLIPSE_PARSER 11 #define foreach(a, b) for(a : b) 12 #else 13 #define foreach(a, b) BOOST_FOREACH(a, b) 14 #endif 5 15 6 16 // namespace ariba::transport … … 9 19 10 20 using namespace ariba::addressing; 21 using boost::asio::ip::tcp; 22 23 #ifdef HAVE_LIBBLUETOOTH 24 using boost::asio::bluetooth::rfcomm; 25 #endif 26 27 use_logging_cpp(transport_peer); 11 28 12 29 transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) { 13 // setup tcp transports 14 tcp = NULL; 15 //cout << "#tcpip_transports = " << local.tcp.size() << endl; 16 if (local.tcp.size()==1) { 17 tcp = new tcpip(local.tcp.begin()->value()); 18 //cout << "Started tcpip_transport on port " << local.tcp.begin()->value() << endl; 19 } 20 30 31 // setup tcp transports 32 foreach(tcp_port_address port, local.tcp) { 33 34 if (local.ip.size() > 0) { 35 foreach(ip_address ip_addr, local.ip) { 36 37 tcp::endpoint endp(ip_addr.asio(), port.asio()); 38 create_service(endp); 39 } 40 } else { 41 tcp::endpoint endp_v6(tcp::v6(), port.asio()); 42 tcp::endpoint endp_v4(tcp::v4(), port.asio()); 43 44 create_service(endp_v6); 45 create_service(endp_v4); 46 } 47 48 } 49 21 50 #ifdef HAVE_LIBBLUETOOTH 22 // setup rfcomm transports 23 rfc = NULL; 24 //cout << "#rfcomm_transports = " << local.rfcomm.size() << endl; 25 if ( local.rfcomm.size() == 1 ) { 26 rfc = new rfcomm( local.rfcomm.begin()->value() ); 27 //cout << "Started rfcomm_transport on port " << local.rfcomm.begin()->value() << endl; 28 } 51 foreach(rfcomm_channel_address channel, local.rfcomm) { 52 if (local.bluetooth.size() > 0) { 53 foreach(mac_address mac, local.bluetooth) { 54 rfcomm::endpoint endp(mac.bluetooth(), channel.value()); 55 create_service(endp); 56 } 57 } else { 58 rfcomm::endpoint endp(channel.value()); 59 create_service(endp); 60 } 61 } 29 62 #endif 30 63 } 31 64 65 void transport_peer::create_service(tcp::endpoint endp) { 66 try { 67 TcpIpPtr tmp_ptr(new tcpip(endp)); 68 tcps.push_back(tmp_ptr); 69 logging_info("Listening on IP/TCP " << endp); 70 71 } catch (boost::system::system_error& e) { 72 if (e.code() == boost::asio::error::address_in_use) { 73 logging_warn("[WARN] Address already in use: " 74 << endp << ". Endpoint will be ignored!"); 75 } else { 76 // Rethrow 77 throw; 78 } 79 } 80 } 81 82 #ifdef HAVE_LIBBLUETOOTH 83 void transport_peer::create_service(rfcomm::endpoint endp) { 84 try { 85 rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp)); 86 rfcomms.push_back(tmp_ptr); 87 logging_info("Listening on bluetooth/RFCOMM " << endp); 88 89 } catch (boost::system::system_error& e) { 90 if (e.code() == boost::asio::error::address_in_use) { 91 logging_warn("[WARN] Address already in use: " 92 << endp << ". Endpoint will be ignored!"); 93 } else { 94 // Rethrow 95 throw; 96 } 97 } 98 } 99 #endif 100 32 101 transport_peer::~transport_peer() { 33 if (tcp !=NULL ) delete tcp;34 #ifdef HAVE_LIBBLUETOOTH35 if (rfc !=NULL ) delete rfc;36 #endif37 102 } 38 103 39 104 void transport_peer::start() { 40 if (tcp!=NULL) tcp->start(); 105 foreach(TcpIpPtr tcp, tcps) { 106 tcp->start(); 107 } 108 41 109 #ifdef HAVE_LIBBLUETOOTH 42 if (rfc!=NULL) rfc->start(); 110 foreach(rfcomm_transport::sptr x, rfcomms) { 111 x->start(); 112 } 43 113 #endif 44 114 } 45 115 46 116 void transport_peer::stop() { 47 if (tcp!=NULL) tcp->stop(); 117 foreach(TcpIpPtr tcp, tcps) { 118 tcp->stop(); 119 } 120 48 121 #ifdef HAVE_LIBBLUETOOTH 49 if (rfc!=NULL) rfc->stop(); 122 foreach(rfcomm_transport::sptr x, rfcomms) { 123 x->stop(); 124 } 50 125 #endif 51 126 } 52 127 53 void transport_peer::send( const address_v* remote, const uint8_t* data, size_t size ) { 54 if (remote->instanceof<tcpip_endpoint>() && tcp!=NULL) { 55 tcp->send(remote,data,size); 56 } else 128 129 void transport_peer::send( 130 const endpoint_set& endpoints, 131 reboost::message_t message, 132 uint8_t priority) 133 { 134 foreach(TcpIpPtr tcp, tcps) { 135 tcp->send(endpoints, message, priority); 136 } 137 57 138 #ifdef HAVE_LIBBLUETOOTH 58 if (remote->instanceof<rfcomm_endpoint>() && rfc!=NULL) { 59 rfc->send(remote,data,size); 60 } else 61 #endif 62 cerr << "Could not send message to " << remote->to_string() << endl; 63 } 64 65 void transport_peer::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) { 66 if (tcp!=NULL) tcp->send(endpoints,data,size); 67 #ifdef HAVE_LIBBLUETOOTH 68 if (rfc!=NULL) rfc->send(endpoints,data,size); 139 foreach(rfcomm_transport::sptr x, rfcomms) { 140 x->send(endpoints, message, priority); 141 } 69 142 #endif 70 143 } 71 144 72 145 void transport_peer::terminate( const address_v* remote ) { 73 if (remote->instanceof<tcpip_endpoint>() && tcp!=NULL) 74 tcp->terminate(remote); 146 if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung 147 { 148 foreach(TcpIpPtr tcp, tcps) { 149 tcp->terminate(remote); 150 } 151 } 75 152 #ifdef HAVE_LIBBLUETOOTH 76 if (remote->instanceof<rfcomm_endpoint>() && rfc!=NULL) 77 rfc->terminate(remote); 153 if (remote->instanceof<rfcomm_endpoint>()) { 154 foreach(rfcomm_transport::sptr x, rfcomms) { 155 x->terminate(remote); 156 } 157 } 78 158 #endif 79 159 } 80 160 81 161 void transport_peer::register_listener( transport_listener* listener ) { 82 if (tcp!=NULL) tcp->register_listener(listener); 162 foreach(TcpIpPtr tcp, tcps) { 163 tcp->register_listener(listener); 164 } 165 83 166 #ifdef HAVE_LIBBLUETOOTH 84 if (rfc!=NULL) rfc->register_listener(listener); 167 foreach(rfcomm_transport::sptr x, rfcomms) { 168 x->register_listener(listener); 169 } 85 170 #endif 86 171 } -
source/ariba/utility/transport/transport_peer.hpp
r9324 r10653 5 5 #include "transport_protocol.hpp" 6 6 #include "ariba/utility/addressing/endpoint_set.hpp" 7 #include <boost/shared_ptr.hpp> 8 #include "rfcomm/bluetooth_rfcomm.hpp" 9 7 10 8 11 // namespace ariba::transport … … 13 16 14 17 class tcpip; 18 15 19 #ifdef HAVE_LIBBLUETOOTH 16 class rfcomm ;20 class rfcomm_transport; 17 21 #endif 18 22 … … 30 34 virtual void start(); 31 35 virtual void stop(); 32 virtual void send( const address_v* remote, const uint8_t* data, size_t size ); 33 virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size ); 36 37 virtual void send( 38 const endpoint_set& endpoints, 39 reboost::message_t message, 40 uint8_t priority = 0); 41 42 /// @deprecated: Use terminate() from transport_connection instead 34 43 virtual void terminate( const address_v* remote ); 44 35 45 virtual void register_listener( transport_listener* listener ); 36 46 37 47 private: 48 void create_service(tcp::endpoint endp); 49 #ifdef HAVE_LIBBLUETOOTH 50 void create_service(boost::asio::bluetooth::rfcomm::endpoint endp); 51 #endif 52 38 53 endpoint_set& local; 39 tcpip* tcp;54 std::vector< boost::shared_ptr<tcpip> > tcps; 40 55 #ifdef HAVE_LIBBLUETOOTH 41 rfcomm* rfc;56 std::vector< boost::shared_ptr<rfcomm_transport> > rfcomms; 42 57 #endif 43 58 }; -
source/ariba/utility/transport/transport_protocol.hpp
r5993 r10653 3 3 4 4 #include "ariba/utility/addressing/addressing.hpp" 5 #include "transport_listener.hpp" 5 #include "ariba/utility/transport/transport_listener.hpp" 6 #include "ariba/utility/transport/messages/message.hpp" 6 7 7 8 // namespace ariba::transport … … 18 19 class transport_protocol { 19 20 public: 21 /// Allow deleting implementing classes by pointer 22 virtual ~transport_protocol() {} 23 20 24 virtual void start() = 0; 21 25 virtual void stop() = 0; 22 virtual void send( const address_v* remote, const uint8_t* data, size_t size ) = 0; 23 virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) = 0; 26 27 virtual void send( 28 const endpoint_set& endpoints, 29 reboost::message_t message, 30 uint8_t priority = 0) = 0; 31 32 /// @deprecated: Use terminate() from transport_connection instead 24 33 virtual void terminate( const address_v* remote ) = 0; 34 25 35 virtual void register_listener( transport_listener* listener ) = 0; 26 36 };
Note:
See TracChangeset
for help on using the changeset viewer.