Changeset 12060 for source/ariba/communication
- Timestamp:
- Jun 19, 2013, 11:05:49 AM (12 years ago)
- Location:
- source/ariba/communication
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/communication/BaseCommunication.cpp
r10767 r12060 57 57 namespace communication { 58 58 59 using namespace ariba::addressing2; 60 59 61 using ariba::utility::PeerID; 60 62 using ariba::utility::SystemQueue; 61 63 62 64 use_logging_cpp(BaseCommunication); 63 64 /// adds an endpoint to the list65 void BaseCommunication::add_endpoint( const address_v* endpoint ) {66 if (endpoint==NULL) return;67 BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {68 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {69 ref.count++;70 return;71 }72 }73 endpoint_reference ref;74 ref.endpoint = endpoint->clone();75 ref.count = 1;76 remote_endpoints.push_back(ref);77 }78 79 /// removes an endpoint from the list80 void BaseCommunication::remove_endpoint( const address_v* endpoint ) {81 if (endpoint==NULL) return;82 for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();83 i != remote_endpoints.end(); i++) {84 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {85 i->count--;86 if (i->count==0) {87 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");88 transport->terminate(i->endpoint);89 delete i->endpoint;90 remote_endpoints.erase(i);91 }92 return;93 }94 }95 }96 65 97 66 … … 100 69 transport( NULL ), 101 70 messageReceiver( NULL ), 102 started( false ) 71 started( false ), 72 listenOn_endpoints(new addressing2::endpoint_set()) 103 73 { 104 74 } … … 109 79 110 80 111 void BaseCommunication::start() { 81 void BaseCommunication::start(EndpointSetPtr listen_on) { 82 assert ( ! started ); 83 84 listenOn_endpoints = listen_on; 85 logging_info("Setting local end-points: " << listenOn_endpoints->to_string()); 86 112 87 logging_info( "Starting up ..." ); 113 88 currentSeqnum = 0; 114 89 115 // set local peer id116 localDescriptor.getPeerId() = PeerID::random();117 logging_info( "Using PeerID: " << localDescriptor.getPeerId() );118 119 90 // creating transports 91 // ---> transport_peer holds the set of the active endpoints we're listening on 120 92 logging_info( "Creating transports ..." ); 121 122 #ifdef UNDERLAY_OMNET 123 AribaOmnetModule* module = StartupWrapper::getCurrentModule(); 124 module->setServerPort( listenport ); 125 126 transport = module; 127 network = new OmnetNetworkProtocol( module ); 128 #else 129 transport = new transport_peer( localDescriptor.getEndpoints() ); 130 #endif 93 transport = new transport_peer(); 94 active_listenOn_endpoints = transport->add_listenOn_endpoints(listenOn_endpoints); 95 logging_info( "XXX. Active endpoints = " << active_listenOn_endpoints->to_string() ); // XXX 131 96 132 97 logging_info( "Searching for local locators ..." ); 133 /** 134 * DONT DO THAT: if(localDescriptor.getEndpoints().to_string().length() == 0) 135 * since addresses are used to initialize transport addresses 136 */ 137 AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() ); 138 logging_info( "Done. Local endpoints = " << localDescriptor.toString() ); 139 98 local_endpoints = AddressDiscovery::discover_endpoints(active_listenOn_endpoints); 99 if ( local_endpoints->count() > 0 ) 100 { 101 logging_info( "Done. Discovered local endpoints: " << local_endpoints->to_string() ); 102 } 103 else 104 { 105 logging_warn("WARING!! No local endpoints found, NO COMMUNICATION POSSIBLE!!"); 106 107 // TODO notify application, so that it may react properly. throw exception..? 108 assert( false ); 109 } 110 111 112 // create local EndpointDescriptor 113 // ---> localDescriptor hold the set endpoints that can be used to reach us 114 localDescriptor.getPeerId() = PeerID::random(); 115 localDescriptor.replace_endpoint_set(local_endpoints); 116 logging_info( "Using PeerID: " << localDescriptor.getPeerId() ); 117 118 // start transport_peer 140 119 transport->register_listener( this ); 141 120 transport->start(); 142 121 143 #ifndef UNDERLAY_OMNET144 122 // bind to the network change detection 145 123 networkMonitor.registerNotification( this ); 146 #endif147 124 148 125 // base comm startup done … … 163 140 bool BaseCommunication::isStarted(){ 164 141 return started; 165 }166 167 /// Sets the endpoints168 void BaseCommunication::setEndpoints( string& _endpoints ) {169 localDescriptor.getEndpoints().assign(_endpoints);170 logging_info("Setting local end-points: "171 << localDescriptor.getEndpoints().to_string());172 142 } 173 143 … … 193 163 addLink( ld ); 194 164 195 // send a message to request new link to remote 196 logging_debug( "Send messages with request to open link to " << descriptor.toString() ); 197 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid ); 198 baseMsg.getLocalDescriptor() = localDescriptor; 199 baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId(); 200 201 // serialize and send message 202 send( &baseMsg, descriptor ); 165 166 /* send a message to request new link to remote */ 167 logging_debug( "Send messages with request to open link to " << descriptor.toString() ); 168 169 /* 170 * Create Link-Request Message: 171 * NOTE: - Their PeerID (in parent message) 172 * - Our LinkID 173 * - Our PeerID 174 * - Our EndpointDescriptor 175 */ 176 reboost::message_t linkmsg; 177 linkmsg.push_back(linkid.serialize()); 178 linkmsg.push_back(localDescriptor.getPeerId().serialize()); 179 linkmsg.push_back(localDescriptor.endpoints->serialize()); 180 181 // // XXX AKTUELL BUG FINDING... 182 // reboost::shared_buffer_t xxx = localDescriptor.endpoints->serialize(); 183 // EndpointSetPtr xxx_set = endpoint_set::create_EndpointSet(); 184 // xxx_set->deserialize(xxx); 185 // cout << "/// MARIO VORHER: " << localDescriptor.endpoints->to_string() << endl; 186 // cout << "/// MARIO NACHHER: " << xxx_set->to_string() << endl; 187 188 // send message 189 // TODO move enum to BaseComm 190 send_to_peer(AribaBaseMsg::typeLinkRequest, descriptor.getPeerId(), linkmsg, 191 descriptor, system_priority::OVERLAY); 203 192 204 193 return linkid; … … 217 206 218 207 // tell the registered listeners 219 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {208 foreach( CommunicationEvents* i, eventListener ) { 220 209 i->onLinkDown( link, ld.localLocator, ld.remoteLocator ); 221 210 } 222 211 223 // create message to drop the link 212 213 // * send message to drop the link * 224 214 logging_debug( "Sending out link close request. for us, the link is closed now" ); 225 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink ); 226 227 // send message to drop the link 228 send( &msg, ld ); 215 reboost::message_t empty_message; 216 send_over_link( AribaBaseMsg::typeLinkClose, empty_message, ld, system_priority::OVERLAY ); 229 217 230 218 // remove from map … … 232 220 } 233 221 234 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) { 235 236 logging_debug( "Sending out message to link " << lid.toString() ); 237 222 223 seqnum_t BaseCommunication::sendMessage( const LinkID& lid, 224 reboost::message_t message, 225 uint8_t priority, 226 bool bypass_overlay) throw(communication_message_not_sent) 227 { 228 // message type: direct data or (normal) data 229 AribaBaseMsg::type_ type; 230 if ( bypass_overlay ) 231 { 232 type = AribaBaseMsg::typeDirectData; 233 logging_debug( "Sending out direct-message to link " << lid.toString() ); 234 } 235 else 236 { 237 type = AribaBaseMsg::typeData; 238 logging_debug( "Sending out message to link " << lid.toString() ); 239 } 240 241 238 242 // query local link info 239 243 LinkDescriptor& ld = queryLocalLink(lid); 240 if( ld.isUnspecified() ){ 241 logging_error( "Don't know the link with id " << lid.toString() ); 242 return -1; 244 if( ld.isUnspecified() ) 245 { 246 throw communication_message_not_sent("Don't know the link with id " 247 + lid.toString()); 243 248 } 244 249 245 250 // link not up-> error 246 if( !ld.up ) {247 logging_error("Can not send on link " << lid.toString() << ": link not up");248 return -1;249 }250 251 // create message 252 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink ); 253 254 // encapsulate the payload message255 msg.encapsulate( const_cast<Message*>(message) ); 256 257 // send message258 send( &msg, ld);259 260 // return sequence number251 if( !ld.up ) 252 { 253 throw communication_message_not_sent("Can not send on link " 254 + lid.toString() + ": link not up"); 255 } 256 257 258 // * send message * 259 bool okay = send_over_link( type, message, ld, priority ); 260 261 if ( ! okay ) 262 { 263 throw communication_message_not_sent("send_over_link failed!"); 264 } 265 261 266 return ++currentSeqnum; 262 267 } … … 268 273 LinkDescriptor& linkDesc = queryLocalLink(link); 269 274 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); 270 return linkDesc.remote Endpoint;275 return linkDesc.remoteDescriptor; 271 276 } 272 277 } … … 283 288 } 284 289 285 SystemEventType TransportEvent("Transport"); 286 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent ); 287 288 /// called when a system event is emitted by system queue 289 void BaseCommunication::handleSystemEvent(const SystemEvent& event) { 290 291 // dispatch received messages 292 if ( event.getType() == MessageDispatchEvent ){ 293 logging_debug( "Forwarding message receiver" ); 294 boost::function0<void>* handler = event.getData< boost::function0<void> >(); 295 (*handler)(); 296 delete handler; 297 } 298 } 299 300 /** 301 * called within the ASIO thread 302 * when a message is received from underlay transport 303 */ 290 291 292 /*------------------------------ 293 | ASIO thread --> SystemQueue | 294 ------------------------------*/ 295 296 /// ASIO thread 304 297 void BaseCommunication::receive_message(transport_connection::sptr connection, 305 reboost:: message_t msg) {298 reboost::shared_buffer_t msg) { 306 299 307 300 logging_debug( "Dispatching message" ); 308 301 309 boost::function0<void>* handler = new boost::function0<void>(302 SystemQueue::instance().scheduleCall( 310 303 boost::bind( 311 304 &BaseCommunication::receiveMessage, … … 313 306 connection, 314 307 msg) 315 ); 316 317 SystemQueue::instance().scheduleEvent( 318 SystemEvent(this, MessageDispatchEvent, handler) 319 ); 320 } 321 322 /** 323 * called within the ARIBA thread (System Queue) 324 * when a message is received from underlay transport 325 */ 308 ); 309 } 310 311 /// ASIO thread 312 void BaseCommunication::connection_terminated(transport_connection::sptr connection) 313 { 314 SystemQueue::instance().scheduleCall( 315 boost::bind( 316 &BaseCommunication::connectionTerminated, 317 this, 318 connection) 319 ); 320 } 321 322 /*-------------------------------- 323 | [ASIO thread --> SystemQueue] | 324 -------------------------------*/ 325 326 /// ARIBA thread (System Queue) 327 void BaseCommunication::connectionTerminated(transport_connection::sptr connection) 328 { 329 vector<LinkID*> links = connection->get_communication_links(); 330 331 logging_debug("[BaseCommunication] Connection terminated: " 332 << connection->getLocalEndpoint()->to_string() 333 << " <--> " << connection->getRemoteEndpoint()->to_string() 334 << " (" << links.size() << " links)"); 335 336 // remove all links that used the terminated connection 337 for ( vector<LinkID*>::iterator it = links.begin(); it != links.end(); ++it ) 338 { 339 LinkID& link_id = **it; 340 341 logging_debug(" ---> Removing link: " << link_id.toString()); 342 343 // searching for link, not found-> warn 344 LinkDescriptor& linkDesc = queryLocalLink( link_id ); 345 if (linkDesc.isUnspecified()) { 346 logging_warn("Failed to find local link " << link_id.toString()); 347 continue; 348 } 349 350 // inform listeners 351 foreach( CommunicationEvents* i, eventListener ){ 352 i->onLinkFail( linkDesc.localLink, 353 linkDesc.localLocator, linkDesc.remoteLocator ); 354 } 355 356 // remove the link descriptor 357 removeLink( link_id ); 358 } 359 } 360 361 /// ARIBA thread (System Queue) 326 362 void BaseCommunication::receiveMessage(transport_connection::sptr connection, 327 reboost:: message_t message)363 reboost::shared_buffer_t message) 328 364 { 329 330 //// Adapt to old message system //// 331 // Copy data 332 size_t bytes_len = message.size(); 333 uint8_t* bytes = new uint8_t[bytes_len]; 334 message.read(bytes, 0, bytes_len); 335 336 Data data(bytes, bytes_len * 8); 337 338 Message legacy_message; 339 legacy_message.setPayload(data); 340 341 342 343 /// decapsulate message 344 AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>(); 345 logging_debug( "Receiving message of type " << msg->getTypeString() ); 346 365 // XXX 366 logging_debug("/// [receiveMessage] buffersize: " << message.size()); 367 368 // get type 369 uint8_t type = message.data()[0]; 370 reboost::shared_buffer_t sub_buff = message(1); 371 372 // get link id 373 LinkID link_id; 374 if ( type != AribaBaseMsg::typeLinkRequest) 375 { 376 sub_buff = link_id.deserialize(sub_buff); 377 } 378 347 379 // handle message 348 switch ( msg->getType()) {349 380 switch ( type ) 381 { 350 382 // --------------------------------------------------------------------- 351 383 // data message 352 384 // --------------------------------------------------------------------- 353 case AribaBaseMsg::typeData: { 354 logging_debug( "Received data message, forwarding to overlay" ); 355 if( messageReceiver != NULL ) { 385 case AribaBaseMsg::typeData: 386 { 387 logging_debug( "Received data message, forwarding to overlay." ); 388 if( messageReceiver != NULL ) 389 { 356 390 messageReceiver->receiveMessage( 357 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED391 sub_buff, link_id, NodeID::UNSPECIFIED, false 358 392 ); 359 393 } 394 360 395 break; 361 396 } 362 397 398 // --------------------------------------------------------------------- 399 // direct data message (bypass overlay-layer) 400 // --------------------------------------------------------------------- 401 case AribaBaseMsg::typeDirectData: 402 { 403 logging_debug( "Received direct data message, forwarding to application." ); 404 405 if( messageReceiver != NULL ) 406 { 407 messageReceiver->receiveMessage( 408 sub_buff, link_id, NodeID::UNSPECIFIED, true 409 ); 410 } 411 412 break; 413 } 414 415 416 363 417 // --------------------------------------------------------------------- 364 418 // handle link request from remote 365 419 // --------------------------------------------------------------------- 366 case AribaBaseMsg::typeLinkRequest: { 367 logging_debug( "Received link open request" ); 368 369 /// not the correct peer id-> skip request 370 if (!msg->getRemoteDescriptor().getPeerId().isUnspecified() 371 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) { 372 logging_info("Received link request for " 373 << msg->getRemoteDescriptor().getPeerId().toString() 374 << "but i'm " 375 << localDescriptor.getPeerId() 376 << ": Ignoring!"); 377 break; 378 } 379 420 case AribaBaseMsg::typeLinkRequest: 421 { 422 logging_debug( "Received link open request on " 423 << connection->getLocalEndpoint()->to_string() ); 424 425 /* 426 * Deserialize Peer Message 427 * - Our PeerID 428 */ 429 PeerID our_peer_id; 430 sub_buff = our_peer_id.deserialize(sub_buff); 431 432 /// not the correct peer id-> skip request 433 if ( our_peer_id != localDescriptor.getPeerId() && 434 ! our_peer_id.isUnspecified() /* overlay bootstrap */ ) 435 { 436 logging_info("Received link request for " 437 << our_peer_id.toString() 438 << "but i'm " 439 << localDescriptor.getPeerId() 440 << ": Ignoring!"); 441 442 // TODO terminate connection? 443 444 break; 445 } 446 447 448 /* 449 * Deserialize Link-Request Message: 450 * - Their LinkID 451 * - Their PeerID 452 * - Their EndpointDescriptor 453 */ 454 LinkID their_link_id; 455 PeerID their_peer_id; 456 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); 457 sub_buff = their_link_id.deserialize(sub_buff); 458 sub_buff = their_peer_id.deserialize(sub_buff); 459 sub_buff = their_endpoints->deserialize(sub_buff); 460 /* [ Deserialize Link-Request Message ] */ 461 462 380 463 /// only answer the first request 381 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) { 464 if (!queryRemoteLink(their_link_id).isUnspecified()) 465 { 466 467 // TODO aktuell: When will these connections be closed? 468 // ---> Close it now (if it services no links) ? 469 // (see also ! allowlink below) 470 471 // XXX AKTUELL TESTING !! This will cause race conditions. So this is test-code only! 472 if ( connection->get_communication_links().size() == 0 ) 473 { 474 connection->terminate(); 475 } 476 382 477 logging_debug("Link request already received. Ignore!"); 383 478 break; … … 386 481 /// create link ids 387 482 LinkID localLink = LinkID::create(); 388 LinkID remoteLink = msg->getLocalLink();483 LinkID remoteLink = their_link_id; // XXX intermediate variable is unnecessary 389 484 logging_debug( 390 485 "local=" << connection->getLocalEndpoint()->to_string() … … 394 489 // check if link creation is allowed by ALL listeners 395 490 bool allowlink = true; 396 BOOST_FOREACH( CommunicationEvents* i, eventListener ){491 foreach( CommunicationEvents* i, eventListener ){ 397 492 allowlink &= i->onLinkRequest( localLink, 398 493 connection->getLocalEndpoint(), … … 403 498 if( !allowlink ){ 404 499 logging_warn( "Overlay denied creation of link" ); 405 delete msg;406 500 return; 407 501 } … … 411 505 ld->localLink = localLink; 412 506 ld->remoteLink = remoteLink; 413 ld->localLocator = connection->getLocalEndpoint()->clone(); 414 ld->remoteLocator = connection->getRemoteEndpoint()->clone(); 415 ld->connection = connection; 416 ld->remoteEndpoint = msg->getLocalDescriptor(); 417 add_endpoint(ld->remoteLocator); 418 419 // add layer 1-3 addresses 420 ld->remoteEndpoint.getEndpoints().add( 421 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 422 localDescriptor.getEndpoints().add( 423 connection->getLocalEndpoint(), 424 endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 507 ld->localLocator = connection->getLocalEndpoint(); 508 ld->remoteLocator = connection->getRemoteEndpoint(); 509 ld->remoteDescriptor = EndpointDescriptor(their_peer_id, their_endpoints); 510 ld->set_connection(connection); 511 512 513 // update endpoints (should only have any effect in case of NAT) 514 ld->remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); 515 // localDescriptor.endpoints->add_endpoint(connection->getLocalEndpoint()); // XXX 0.0.0.0:0 425 516 426 517 // link is now up-> add it … … 428 519 addLink(ld); 429 520 430 // link is up! 431 logging_debug( "Link (initiated from remote) is up with " 432 << "local(id=" << ld->localLink.toString() << "," 433 << "locator=" << ld->localLocator->to_string() << ") " 434 << "remote(id=" << ld->remoteLink.toString() << ", " 435 << "locator=" << ld->remoteLocator->to_string() << ")" 436 ); 437 438 // sending link request reply 439 logging_debug( "Sending link request reply with ids " 440 << "local=" << localLink.toString() << ", " 441 << "remote=" << remoteLink.toString() ); 442 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink ); 443 reply.getLocalDescriptor() = localDescriptor; 444 reply.getRemoteDescriptor() = ld->remoteEndpoint; 445 446 send( &reply, *ld ); 521 522 523 /* sending link reply */ 524 logging_debug( "Sending link reply with ids " 525 << "local=" << localLink.toString() << ", " 526 << "remote=" << remoteLink.toString() ); 527 528 /* 529 * Create Link-Reply Message: 530 * - Our LinkID 531 * - Our Endpoint_Set (as update) 532 * - Their EndpointDescriptor (maybe they learn something about NAT) 533 */ 534 reboost::message_t linkmsg; 535 linkmsg.push_back(localLink.serialize()); 536 linkmsg.push_back(localDescriptor.endpoints->serialize()); 537 linkmsg.push_back(ld->remoteDescriptor.endpoints->serialize()); 538 539 // XXX 540 cout << "/// MARIO: " << ld->get_connection()->getRemoteEndpoint()->to_string() << endl; 541 542 // send message 543 bool sent = send_over_link( AribaBaseMsg::typeLinkReply, linkmsg, *ld, system_priority::OVERLAY ); 544 545 if ( ! sent ) 546 { 547 logging_error("ERROR: Could not send LinkReply to: " << ld->remoteLocator->to_string()); 548 549 // TODO remove link, close link, ..? 550 551 break; 552 } 553 554 555 // link is up! 556 logging_debug( "Link (initiated from remote) is up with " 557 << "local(id=" << ld->localLink.toString() << "," 558 << "locator=" << ld->localLocator->to_string() << ") " 559 << "remote(id=" << ld->remoteLink.toString() << ", " 560 << "locator=" << ld->remoteLocator->to_string() << ")" 561 ); 447 562 448 563 // inform listeners about new open link 449 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {564 foreach( CommunicationEvents* i, eventListener ) { 450 565 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator); 451 566 } … … 458 573 // handle link request reply 459 574 // --------------------------------------------------------------------- 460 case AribaBaseMsg::typeLinkReply: { 575 case AribaBaseMsg::typeLinkReply: 576 { 461 577 logging_debug( "Received link open reply for a link we initiated" ); 462 578 579 /* 580 * Deserialize Link-Reply Message: 581 * - Their LinkID 582 * - Their Endpoint_Set (as update) 583 * - Our EndpointDescriptor (maybe we can learn something about NAT) 584 */ 585 LinkID their_link_id; 586 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); 587 EndpointSetPtr our_endpoints = endpoint_set::create_EndpointSet(); 588 sub_buff = their_link_id.deserialize(sub_buff); 589 sub_buff = their_endpoints->deserialize(sub_buff); 590 sub_buff = our_endpoints->deserialize(sub_buff); 591 592 463 593 // this is a reply to a link open request, so we have already 464 594 // a link mapping and can now set the remote link to valid 465 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink());595 LinkDescriptor& ld = queryLocalLink( link_id ); 466 596 467 597 // no link found-> warn! 468 598 if (ld.isUnspecified()) { 469 logging_warn("Failed to find local link " << msg->getRemoteLink().toString()); 470 delete msg; 599 logging_warn("Failed to find local link " << link_id.toString()); 471 600 return; 472 601 } 602 603 if ( ld.up ) 604 { 605 logging_warn("Got link replay for already open link. Ignore. LinkID: " << link_id.toString()); 606 607 // TODO send LinkClose ? 608 return; 609 } 473 610 474 611 // store the connection 475 ld. connection = connection;612 ld.set_connection(connection); 476 613 477 614 // set remote locator and link id 478 ld.remoteLink = msg->getLocalLink(); 479 ld.remoteLocator = connection->getRemoteEndpoint()->clone(); 480 ld.remoteEndpoint.getEndpoints().add( 481 msg->getLocalDescriptor().getEndpoints(), 482 endpoint_set::Layer1_4 483 ); 484 485 localDescriptor.getEndpoints().add( 486 msg->getRemoteDescriptor().getEndpoints(), 487 endpoint_set::Layer1_3 488 ); 615 ld.remoteLink = their_link_id; 616 ld.remoteLocator = connection->getRemoteEndpoint(); 617 618 619 /* Update endpoints */ 620 // NOTE: we might loose some information here, but it's our only chance to get rid of outdated information. 621 ld.remoteDescriptor.replace_endpoint_set(their_endpoints); 622 623 // add actual remote endpoint to this set (should only have any effect in case of NAT) 624 ld.remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); 625 626 // TODO In case of NAT, we could learn something about our external IP. 627 // ---> But we must trust the remote peer about this information!! 628 // localDescriptor.endpoints->add_endpoints(our_endpoints); 629 630 631 632 489 633 ld.up = true; 490 add_endpoint(ld.remoteLocator);491 634 492 635 logging_debug( "Link is now up with local id " … … 496 639 497 640 // inform lisneters about link up event 498 BOOST_FOREACH( CommunicationEvents* i, eventListener ){641 foreach( CommunicationEvents* i, eventListener ){ 499 642 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator ); 500 643 } … … 509 652 case AribaBaseMsg::typeLinkClose: { 510 653 // get remote link 511 const LinkID& localLink = msg->getRemoteLink();512 logging_debug( "Received link close request for link " << l ocalLink.toString() );654 // const LinkID& localLink = msg.getRemoteLink(); 655 logging_debug( "Received link close request for link " << link_id.toString() ); 513 656 514 657 // searching for link, not found-> warn 515 LinkDescriptor& linkDesc = queryLocalLink( l ocalLink);658 LinkDescriptor& linkDesc = queryLocalLink( link_id ); 516 659 if (linkDesc.isUnspecified()) { 517 logging_warn("Failed to find local link " << localLink.toString()); 518 delete msg; 660 logging_warn("Failed to find local link " << link_id.toString()); 519 661 return; 520 662 } 521 663 522 664 // inform listeners 523 BOOST_FOREACH( CommunicationEvents* i, eventListener ){665 foreach( CommunicationEvents* i, eventListener ){ 524 666 i->onLinkDown( linkDesc.localLink, 525 667 linkDesc.localLocator, linkDesc.remoteLocator ); … … 527 669 528 670 // remove the link descriptor 529 removeLink( l ocalLink);671 removeLink( link_id ); 530 672 531 673 // done … … 534 676 535 677 // --------------------------------------------------------------------- 536 // handle link locator changes 678 // handle link locator changes -- TODO is this ever called..? 537 679 // --------------------------------------------------------------------- 538 case AribaBaseMsg::typeLinkUpdate: { 539 const LinkID& localLink = msg->getRemoteLink(); 540 logging_debug( "Received link update for link " 541 << localLink.toString() ); 542 543 // find the link description 544 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 545 if (linkDesc.isUnspecified()) { 546 logging_warn("Failed to update local link " 547 << localLink.toString()); 548 delete msg; 549 return; 550 } 551 552 // update the remote locator 553 const address_v* oldremote = linkDesc.remoteLocator; 554 linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone(); 555 556 // inform the listeners (local link has _not_ changed!) 557 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 558 i->onLinkChanged( 559 linkDesc.localLink, // linkid 560 linkDesc.localLocator, // old local 561 linkDesc.localLocator, // new local 562 oldremote, // old remote 563 linkDesc.remoteLocator // new remote 564 ); 565 } 566 567 // done 568 break; 569 } 570 } 571 572 delete msg; 680 // case AribaBaseMsg::typeLinkUpdate: { 681 // const LinkID& localLink = msg.getRemoteLink(); 682 // logging_debug( "Received link update for link " 683 // << localLink.toString() ); 684 // 685 // // find the link description 686 // LinkDescriptor& linkDesc = queryLocalLink( localLink ); 687 // if (linkDesc.isUnspecified()) { 688 // logging_warn("Failed to update local link " 689 // << localLink.toString()); 690 // return; 691 // } 692 // 693 // // update the remote locator 694 // addressing2::EndpointPtr oldremote = linkDesc.remoteLocator; 695 // linkDesc.remoteLocator = connection->getRemoteEndpoint(); 696 // 697 // // TODO update linkDesc.connection ? 698 // 699 // // inform the listeners (local link has _not_ changed!) 700 // foreach( CommunicationEvents* i, eventListener ){ 701 // i->onLinkChanged( 702 // linkDesc.localLink, // linkid 703 // linkDesc.localLocator, // old local 704 // linkDesc.localLocator, // new local 705 // oldremote, // old remote 706 // linkDesc.remoteLocator // new remote 707 // ); 708 // } 709 // 710 // // done 711 // break; 712 // } 713 714 715 default: { 716 logging_warn( "Received unknown message type!" ); 717 break; 718 } 719 720 } 573 721 } 574 722 … … 582 730 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){ 583 731 if( (*i)->localLink != localLink) continue; 584 remove_endpoint((*i)->remoteLocator); 732 // remove_endpoint((*i)->remoteLocator); // XXX 585 733 delete *i; 586 734 linkSet.erase( i ); … … 605 753 } 606 754 607 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {608 LinkIDs ids;609 for (size_t i=0; i<linkSet.size(); i++){610 if( addr == NULL ){611 ids.push_back( linkSet[i]->localLink );612 } else {613 if ( *linkSet[i]->remoteLocator == *addr )614 ids.push_back( linkSet[i]->localLink );615 }616 }617 return ids;618 }755 //LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const { 756 // LinkIDs ids; 757 // for (size_t i=0; i<linkSet.size(); i++){ 758 // if( addr == NULL ){ 759 // ids.push_back( linkSet[i]->localLink ); 760 // } else { 761 // if ( *linkSet[i]->remoteLocator == *addr ) 762 // ids.push_back( linkSet[i]->localLink ); 763 // } 764 // } 765 // return ids; 766 //} 619 767 620 768 void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){ 621 622 #ifdef UNDERLAY_OMNET623 624 // we have no mobility support for simulations625 return626 627 #endif // UNDERLAY_OMNET628 769 629 770 /*- disabled! … … 762 903 } 763 904 764 /// sends a message to all end-points in the end-point descriptor 765 void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) { 766 Data data = data_serialize(legacy_message, DEFAULT_V); 767 768 //// Adapt to new message system //// 769 // transfer data buffer ownership to the shared_buffer 770 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 771 772 reboost::message_t message; 773 message.push_back(buf); 774 775 transport->send(endpoint.getEndpoints(), message); 776 } 777 778 /// sends a message to the remote locator inside the link descriptor 779 void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) { 780 if (desc.remoteLocator==NULL) return; 781 782 Data data = data_serialize(legacy_message, DEFAULT_V); 783 784 //// Adapt to new message system //// 785 // transfer data buffer ownership to the shared_buffer 786 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 787 788 reboost::message_t message; 789 message.push_back(buf); 790 791 desc.connection->send(message); 792 } 905 906 addressing2::EndpointPtr BaseCommunication::get_local_endpoint_of_link( 907 const LinkID& linkid) 908 { 909 LinkDescriptor& ld = queryLocalLink(linkid); 910 911 return ld.get_connection()->getLocalEndpoint(); 912 } 913 914 addressing2::EndpointPtr BaseCommunication::get_remote_endpoint_of_link( 915 const LinkID& linkid) 916 { 917 LinkDescriptor& ld = queryLocalLink(linkid); 918 919 return ld.get_connection()->getRemoteEndpoint(); 920 } 921 922 923 924 bool BaseCommunication::send_over_link( 925 const uint8_t type, 926 reboost::message_t message, 927 const LinkDescriptor& desc, 928 const uint8_t priority) 929 { 930 /* 931 * Create Link Message: 932 * - Type 933 * - Their LinkID 934 */ 935 // link id 936 message.push_front(desc.remoteLink.serialize()); 937 // type 938 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); 939 /* [ Create Link Message ] */ 940 941 942 /* send message */ 943 transport_connection::sptr conn = desc.get_connection(); 944 if ( ! conn ) 945 { 946 cout << "/// MARIO: No connection!!" << endl; // XXX debug 947 return false; 948 } 949 950 // * send over connection * 951 return conn->send(message, priority); 952 } 953 954 void BaseCommunication::send_to_peer( 955 const uint8_t type, 956 const PeerID& peer_id, 957 reboost::message_t message, 958 const EndpointDescriptor& endpoint, 959 const uint8_t priority ) 960 { 961 /* 962 * Create Peer Message: 963 * - Type 964 * - Their PeerID 965 */ 966 // peer id 967 message.push_front(peer_id.serialize()); 968 // type 969 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); 970 971 972 /* send message */ 973 transport->send(endpoint.getEndpoints(), message, priority); 974 } 975 976 977 793 978 794 979 }} // namespace ariba, communication -
source/ariba/communication/BaseCommunication.h
r10653 r12060 50 50 #include <boost/foreach.hpp> 51 51 52 #ifdef ECLIPSE_PARSER 53 #define foreach(a, b) for(a : b) 54 #else 55 #define foreach(a, b) BOOST_FOREACH(a, b) 56 #endif 57 52 58 // utilities 53 59 #include "ariba/utility/types.h" 54 #include "ariba/utility/messages .h"60 #include "ariba/utility/messages/MessageReceiver.h" 55 61 #include "ariba/utility/logging/Logging.h" 56 62 #include "ariba/utility/misc/Demultiplexer.hpp" … … 58 64 59 65 // new transport and addressing 60 #include "ariba/utility/addressing/addressing.hpp" 61 #include "ariba/utility/transport/transport.hpp" 62 #include "ariba/utility/transport/transport_connection.hpp" 66 #include "ariba/utility/transport/transport_peer.hpp" 67 #include "ariba/utility/transport/interfaces/transport_connection.hpp" 68 #include "ariba/utility/transport/interfaces/transport_listener.hpp" 69 #include "ariba/utility/addressing2/endpoint.hpp" 63 70 64 71 // communication … … 72 79 #include "ariba/communication/networkinfo/NetworkInformation.h" 73 80 74 // disabled75 //#ifndef UNDERLAY_OMNET76 // #include "ariba/communication/modules/transport/tcp/TCPTransport.h"77 // #include "ariba/communication/modules/network/ip/IPv4NetworkProtocol.h"78 // using ariba::communication::IPv4NetworkProtocol;79 // using ariba::communication::TCPTransport;80 //#endif81 82 81 namespace ariba { 83 class SideportListener;82 class SideportListener; 84 83 } 85 84 … … 87 86 namespace communication { 88 87 88 89 class communication_message_not_sent: public std::runtime_error 90 { 91 public: 92 /** Takes a character string describing the error. */ 93 explicit communication_message_not_sent(const string& __arg) : 94 std::runtime_error(__arg) 95 { 96 } 97 98 virtual ~communication_message_not_sent() throw() {} 99 }; 100 101 102 89 103 using namespace std; 90 using namespace ariba::addressing;91 104 using namespace ariba::transport; 92 105 using namespace ariba::utility; … … 102 115 * protocols and addressing schemes. 103 116 * 104 * @author Sebastian Mies, Christoph Mayer 117 * @author Sebastian Mies, Christoph Mayer, Mario Hock 105 118 */ 106 119 class BaseCommunication: 107 120 public NetworkChangeInterface, 108 public SystemEventListener,109 121 public transport_listener { 110 122 … … 120 132 121 133 /// Startup the base communication, start modules etc. 122 void start( );134 void start(addressing2::EndpointSetPtr listen_on); 123 135 124 136 /// Stops the base communication, stop modules etc. 125 137 void stop(); 126 127 /// Sets the endpoints128 void setEndpoints( string& endpoints );129 138 130 139 /// Check whether the base communication has been started up … … 147 156 * @return A sequence number for this message 148 157 */ 149 seqnum_t sendMessage(const LinkID lid, const Message* message); 158 seqnum_t sendMessage(const LinkID& lid, 159 reboost::message_t message, 160 uint8_t priority, 161 bool bypass_overlay = false) throw(communication_message_not_sent); 150 162 151 163 /** … … 164 176 * @return List of LinkID 165 177 */ 166 LinkIDs getLocalLinks(const address_v* addr) const; 178 // LinkIDs getLocalLinks(const address_v* addr) const; // XXX aktuell 167 179 168 180 /** … … 187 199 188 200 void unregisterEventListener(CommunicationEvents* _events); 189 190 /// called when a system event is emitted by system queue191 virtual void handleSystemEvent(const SystemEvent& event);192 201 193 202 /** … … 196 205 */ 197 206 virtual void receive_message(transport_connection::sptr connection, 198 reboost::message_t msg); 199 207 reboost::shared_buffer_t msg); 208 209 /** 210 * called within the ASIO thread 211 * when a connection is terminated (e.g. TCP close) 212 */ 213 virtual void connection_terminated(transport_connection::sptr connection); 214 215 addressing2::EndpointPtr get_local_endpoint_of_link(const LinkID& linkid); 216 addressing2::EndpointPtr get_remote_endpoint_of_link(const LinkID& linkid); 217 218 200 219 protected: 201 220 … … 205 224 */ 206 225 void receiveMessage(transport_connection::sptr connection, 207 reboost::message_t msg); 226 reboost::shared_buffer_t message); 227 228 /** 229 * called within the ARIBA thread (System Queue) 230 * when a connection is terminated (e.g. TCP close) 231 */ 232 void connectionTerminated(transport_connection::sptr connection); 233 208 234 209 235 /// called when a network interface change happens … … 221 247 /// default constructor 222 248 LinkDescriptor() : 223 localLink(LinkID::UNSPECIFIED), localLocator(NULL),224 remoteLink(LinkID::UNSPECIFIED), remoteLocator(NULL),249 localLink(LinkID::UNSPECIFIED), 250 remoteLink(LinkID::UNSPECIFIED), 225 251 up(false) { 226 252 } 227 253 228 ~LinkDescriptor() { 229 if (localLocator!=NULL) delete localLocator; 230 if (remoteLocator!=NULL) delete remoteLocator; 254 ~LinkDescriptor() 255 { 256 if ( connection ) 257 { 258 connection->unregister_communication_link(&localLink); 259 } 231 260 } 232 261 … … 240 269 return *unspec; 241 270 } 271 272 273 transport_connection::sptr get_connection() const 274 { 275 return connection; 276 } 277 278 void set_connection(const transport_connection::sptr& conn) 279 { 280 // unregister from old connection, 281 // if any (but normally there shouldn't..) 282 if ( connection ) 283 { 284 connection->unregister_communication_link(&localLink); 285 } 286 287 // * set_connection * 288 connection = conn; 289 290 // register this link with the connection 291 conn->register_communication_link(&localLink); 292 } 242 293 243 294 bool unspecified; … … 245 296 /// link identifiers 246 297 LinkID localLink; 247 const address_v*localLocator;298 addressing2::EndpointPtr localLocator; 248 299 249 300 /// used underlay addresses for the link 250 301 LinkID remoteLink; 251 const address_v*remoteLocator;302 addressing2::EndpointPtr remoteLocator; 252 303 253 304 /// the remote end-point descriptor 254 EndpointDescriptor remote Endpoint;305 EndpointDescriptor remoteDescriptor; 255 306 256 307 /// flag, whether this link is up 257 308 bool up; 258 309 310 311 private: 259 312 /// connection if link is up 260 313 transport_connection::sptr connection; … … 281 334 /// The local end-point descriptor 282 335 EndpointDescriptor localDescriptor; 283 284 #ifndef UNDERLAY_OMNET 336 337 /** 338 * endpoint_set holding the addresses of the "server"-sockets, 339 * ---> that should be opened 340 * 341 * (e.g. 0.0.0.0:41322) 342 */ 343 addressing2::EndpointSetPtr listenOn_endpoints; 344 345 /** 346 * endpoint_set holding the addresses of the "server"-sockets, 347 * ---> that are actually open 348 * 349 * (e.g. 0.0.0.0:41322) 350 * 351 * XXX should only be in transport_peer 352 */ 353 addressing2::EndpointSetPtr active_listenOn_endpoints; 354 355 /** 356 * endpoint_set holding the addresses of the "server"-sockets, 357 * ---> here the discovered "addressable" addresses are stored 358 * 359 * (e.g. 192.168.0.5:41322) 360 * 361 * XXX should only be in localDescriptor 362 */ 363 addressing2::EndpointSetPtr local_endpoints; 364 285 365 /// network change detector 286 366 NetworkChangeDetection networkMonitor; 287 #endif288 367 289 368 /// list of all remote addresses of links to end-points 290 class endpoint_reference { 291 public: 292 int count; ///< the number of open links to this end-point 293 const address_v* endpoint; ///< the end-point itself 294 }; 295 vector<endpoint_reference> remote_endpoints; 296 297 /// adds an end-point to the list 298 void add_endpoint( const address_v* endpoint ); 299 300 /// removes an end-point from the list 301 void remove_endpoint( const address_v* endpoint ); 369 // XXX DEPRECATED 370 // class endpoint_reference { 371 // public: 372 // int count; ///< the number of open links to this end-point 373 // const address_v* endpoint; ///< the end-point itself 374 // }; 375 // vector<endpoint_reference> remote_endpoints; 376 377 // XXX DEPRECATED 378 // /// adds an end-point to the list 379 // void add_endpoint( const address_v* endpoint ); 380 // 381 // /// removes an end-point from the list 382 // void remove_endpoint( const address_v* endpoint ); 302 383 303 384 /// event listener … … 314 395 MessageReceiver* messageReceiver; 315 396 316 /// convenience: send message to peer 317 void send( Message* message, const EndpointDescriptor& endpoint ); 318 void send( Message* message, const LinkDescriptor& descriptor ); 397 398 /* 399 * Sends a message over an existing link. 400 * ---> Adds »Link Message« Header 401 */ 402 bool send_over_link( 403 const uint8_t type, 404 reboost::message_t message, 405 const LinkDescriptor& desc, 406 const uint8_t priority); 407 408 /* 409 * Sends a message to a known peer. (To all known endpoints.) 410 * ---> Adds »Peer Message« Header 411 */ 412 void send_to_peer( 413 const uint8_t type, 414 const PeerID& peer_id, 415 reboost::message_t message, 416 const EndpointDescriptor& endpoint, 417 const uint8_t priority ); 418 319 419 320 420 /// state of the base communication -
source/ariba/communication/CommunicationEvents.cpp
r5284 r12060 49 49 50 50 bool CommunicationEvents::onLinkRequest(const LinkID& id, 51 const address_v* local, const address_v* remote) { 51 const addressing2::EndpointPtr local, 52 const addressing2::EndpointPtr remote) 53 { 52 54 return true; 53 55 } 54 56 55 void CommunicationEvents::onLinkUp(const LinkID& id, const address_v* local, 56 const address_v* remote) { 57 void CommunicationEvents::onLinkUp(const LinkID& id, 58 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 59 { 57 60 } 58 61 59 void CommunicationEvents::onLinkDown(const LinkID& id, const address_v* local, 60 const address_v* remote) { 62 void CommunicationEvents::onLinkDown(const LinkID& id, 63 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 64 { 61 65 } 62 66 63 67 void CommunicationEvents::onLinkChanged(const LinkID& id, 64 const address_v* oldlocal, const address_v* newlocal, 65 const address_v* oldremote, const address_v* newremote) { 68 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 69 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote) 70 { 66 71 } 67 72 68 void CommunicationEvents::onLinkFail(const LinkID& id, const address_v* local, 69 const address_v* remote) { 73 void CommunicationEvents::onLinkFail(const LinkID& id, 74 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 75 { 70 76 } 71 77 72 78 void CommunicationEvents::onLinkQoSChanged(const LinkID& id, 73 const address_v* local, const address_v* remote, const QoSParameterSet& qos) { 79 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 80 const QoSParameterSet& qos) 81 { 74 82 } 75 83 -
source/ariba/communication/CommunicationEvents.h
r5284 r12060 42 42 #include "ariba/utility/types/LinkID.h" 43 43 #include "ariba/utility/types/QoSParameterSet.h" 44 #include "ariba/utility/addressing /addressing.hpp"44 #include "ariba/utility/addressing2/endpoint.hpp" 45 45 46 46 namespace ariba { … … 49 49 using ariba::utility::LinkID; 50 50 using ariba::utility::QoSParameterSet; 51 using namespace ariba::addressing;52 51 53 52 class CommunicationEvents { … … 68 67 * @return True, if the link should be established 69 68 */ 70 virtual bool onLinkRequest(const LinkID& id, const address_v* local, 71 const address_v* remote); 69 virtual bool onLinkRequest(const LinkID& id, 70 const addressing2::EndpointPtr local, 71 const addressing2::EndpointPtr remote); 72 72 73 73 /** … … 77 77 * @param id The link id of the established link 78 78 */ 79 virtual void onLinkUp(const LinkID& id, const address_v* local,80 const address_v*remote);79 virtual void onLinkUp(const LinkID& id, 80 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 81 81 82 82 /** … … 85 85 * @param id The link identifier of the dropped link 86 86 */ 87 virtual void onLinkDown(const LinkID& id, const address_v* local,88 const address_v*remote);87 virtual void onLinkDown(const LinkID& id, 88 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 89 89 90 90 /** … … 97 97 */ 98 98 virtual void onLinkChanged(const LinkID& id, 99 const address_v* oldlocal, const address_v* newlocal, 100 const address_v* oldremote, const address_v* newremote 101 ); 99 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 100 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote); 102 101 103 virtual void onLinkFail(const LinkID& id, const address_v* local,104 const address_v*remote);102 virtual void onLinkFail(const LinkID& id, 103 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 105 104 106 virtual void onLinkQoSChanged(const LinkID& id, const address_v* local, 107 const address_v* remote, const QoSParameterSet& qos); 105 virtual void onLinkQoSChanged(const LinkID& id, 106 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 107 const QoSParameterSet& qos); 108 108 }; 109 109 -
source/ariba/communication/EndpointDescriptor.cpp
r5624 r12060 42 42 namespace communication { 43 43 44 vsznDefault(EndpointDescriptor);44 //vsznDefault(EndpointDescriptor); 45 45 46 EndpointDescriptor::EndpointDescriptor(){ 46 EndpointDescriptor::EndpointDescriptor() : 47 endpoints(new addressing2::endpoint_set()) 48 { 47 49 } 48 50 … … 55 57 } 56 58 57 EndpointDescriptor::EndpointDescriptor(const endpoint_set& endpoints ) : 58 endpoints(endpoints){ 59 EndpointDescriptor::EndpointDescriptor( 60 const PeerID& peer_id, 61 addressing2::EndpointSetPtr endpoints ) : 62 peerId(peer_id), 63 endpoints(endpoints) 64 { 59 65 } 60 66 61 EndpointDescriptor::EndpointDescriptor(const string& str) : endpoints(str){ 67 EndpointDescriptor::EndpointDescriptor(const string& str) : 68 endpoints(new addressing2::endpoint_set()) 69 { 70 cout << "ERROR: Construction of EndpointDescriptor from String is not functional!!" << endl; 71 assert( false ); 62 72 } 63 73 74 75 reboost::message_t EndpointDescriptor::serialize() const 76 { 77 reboost::message_t msg; 78 msg.push_back(peerId.serialize()); 79 msg.push_back(endpoints->serialize()); 80 81 return msg; 82 } 83 84 reboost::shared_buffer_t EndpointDescriptor::deserialize(reboost::shared_buffer_t buff) 85 { 86 buff = peerId.deserialize(buff); 87 buff = endpoints->deserialize(buff); 88 89 return buff; 90 } 91 92 64 93 }} // namespace ariba, communication -
source/ariba/communication/EndpointDescriptor.h
r7744 r12060 42 42 #include <string> 43 43 #include <set> 44 #include "ariba/utility/serialization.h"44 //#include "ariba/utility/serialization.h" 45 45 #include "ariba/utility/types/PeerID.h" 46 #include "ariba/utility/addressing/endpoint_set.hpp" 46 47 #include "ariba/utility/addressing2/endpoint_set.hpp" 48 49 // reboost messages 50 #include "ariba/utility/transport/messages/message.hpp" 51 47 52 48 53 namespace ariba { … … 51 56 using_serialization; 52 57 using namespace std; 53 using namespace ariba::addressing;54 58 using ariba::utility::PeerID; 55 59 56 class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE 57 friend class BaseCommunication; 60 61 /** 62 * This class is used a transitions helper between the old addressing and 63 * serialization to the new addressing2 and the new message classes 64 * 65 * Maybe it will be replaced, or at least modified in the future. 66 */ 67 //class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE 68 // friend class BaseCommunication; 69 class EndpointDescriptor 70 { 71 friend class BaseCommunication; 58 72 59 73 public: … … 68 82 69 83 /// construct end-points from an endpoint set 70 EndpointDescriptor(const endpoint_set&endpoints );84 EndpointDescriptor(const PeerID& peer_id, addressing2::EndpointSetPtr endpoints ); 71 85 86 // FIXME NOT WORKING !! 72 87 /// construct end-points from a string 73 88 EndpointDescriptor(const string& str); … … 75 90 /// convert end-points to string 76 91 string toString() const { 77 return endpoints .to_string();92 return endpoints->to_string(); 78 93 } 79 94 … … 90 105 } 91 106 92 /// create endpoint93 static EndpointDescriptor* fromString(string str) {94 return new EndpointDescriptor(str);95 }107 // /// create endpoint 108 // static EndpointDescriptor* fromString(string str) { 109 // return new EndpointDescriptor(str); 110 // } 96 111 97 112 bool operator==(const EndpointDescriptor& rh) const { … … 113 128 114 129 /// returns the end-points of this descriptor 115 endpoint_set& getEndpoints(){130 addressing2::const_EndpointSetPtr getEndpoints() const { 116 131 return endpoints; 117 132 } 118 119 /// returns the end-points of this descriptor120 const endpoint_set& getEndpoints() const{121 returnendpoints;133 134 void replace_endpoint_set(addressing2::EndpointSetPtr new_endpoints) 135 { 136 endpoints = new_endpoints; 122 137 } 123 138 124 139 /// returns a reference to the peer id 125 140 PeerID& getPeerId() { … … 132 147 return peerId; 133 148 } 149 150 /// returns a message with peerId and endpoints in it 151 reboost::message_t serialize() const; 152 153 /// deserialite peerId and endpoints 154 reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff); 155 134 156 private: 135 endpoint_setendpoints;157 addressing2::EndpointSetPtr endpoints; 136 158 PeerID peerId; 137 159 }; … … 139 161 }} // namespace ariba, communication 140 162 141 sznBeginDefault( ariba::communication::EndpointDescriptor, X ){ 142 143 // serialize peer id 144 X && &peerId; 145 146 // serialize end-points 147 uint16_t len = endpoints.to_bytes_size(); 148 X && len; 149 uint8_t* buffer = X.bytes( len ); 150 if (buffer!=NULL) { 151 if (X.isDeserializer()) endpoints.assign(buffer,len); 152 else endpoints.to_bytes(buffer); 153 } 154 }sznEnd(); 163 //sznBeginDefault( ariba::communication::EndpointDescriptor, X ){ 164 // 165 // // TODO 166 // assert(false); 167 // 168 // // serialize peer id 169 // X && &peerId; 170 // 171 // // serialize end-points 172 // uint16_t len = endpoints.to_bytes_size(); 173 // X && len; 174 // uint8_t* buffer = X.bytes( len ); 175 // if (buffer!=NULL) { 176 // if (X.isDeserializer()) endpoints.assign(buffer,len); 177 // else endpoints.to_bytes(buffer); 178 // } 179 //}sznEnd(); 155 180 156 181 #endif /*ENDPOINTDESCRIPTOR_H_*/ -
source/ariba/communication/messages/AribaBaseMsg.h
r9694 r12060 42 42 #include <string> 43 43 #include <boost/cstdint.hpp> 44 #include "ariba/utility/messages.h" 44 //#include "ariba/utility/messages.h" 45 #include "ariba/utility/messages/Message.h" 45 46 #include "ariba/utility/serialization.h" 46 47 #include "ariba/utility/types/LinkID.h" … … 61 62 using_serialization; 62 63 64 // XXX This whole message is DEPRECATED 63 65 class AribaBaseMsg : public Message { 64 66 VSERIALIZEABLE; … … 69 71 typeLinkReply = 2, 70 72 typeLinkClose = 3, 71 typeLinkUpdate = 4 73 typeLinkUpdate = 4, 74 typeDirectData = 5 72 75 }; 73 76 … … 115 118 116 119 sznBeginDefault( ariba::communication::AribaBaseMsg, X ) { 117 X && type && & localLink && &remoteLink;120 X && type && &remoteLink; 118 121 if (type == typeLinkReply || type == typeLinkRequest) 119 X && localDescriptor && remoteDescriptor;120 X && Payload();122 X && &localLink && localDescriptor && remoteDescriptor; 123 // X && Payload(); 121 124 } sznEnd(); 122 125 -
source/ariba/communication/messages/CMakeLists.txt
r10700 r12060 37 37 # [License] 38 38 39 add_headers(AribaBaseMsg.h)39 #add_headers(AribaBaseMsg.h) 40 40 41 add_sources(AribaBaseMsg.cpp)41 #add_sources(AribaBaseMsg.cpp) -
source/ariba/communication/networkinfo/AddressDiscovery.cpp
r10700 r12060 49 49 #include <ifaddrs.h> 50 50 51 #include <string> 52 #include <boost/asio/ip/address.hpp> 53 #include <boost/foreach.hpp> 54 55 #include "ariba/utility/addressing2/tcpip_endpoint.hpp" 56 #include "ariba/utility/addressing/mac_address.hpp" 57 51 58 #ifdef HAVE_LIBBLUETOOTH 52 59 #include <bluetooth/bluetooth.h> … … 58 65 namespace communication { 59 66 60 mac_address AddressDiscovery::getMacFromIF( const char* name ) { 67 68 using namespace std; 69 using namespace addressing2; 70 using namespace boost::asio::ip; 71 72 using ariba::addressing::mac_address; 73 74 mac_address getMacFromIF( const char* name ) 75 { 61 76 mac_address addr; 62 77 #ifdef HAVE_LIBBLUETOOTH … … 73 88 } 74 89 75 int AddressDiscovery::dev_info(int s, int dev_id, long arg) { 76 #ifdef HAVE_LIBBLUETOOTH 77 endpoint_set* set = (endpoint_set*)arg; 78 struct hci_dev_info di; 79 memset(&di, 0, sizeof(struct hci_dev_info)); 80 di.dev_id = dev_id; 81 if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0; 82 mac_address mac; 83 mac.bluetooth( di.bdaddr ); 84 address_vf vf = mac; 85 set->add(vf); 90 int dev_info(int s, int dev_id, long arg) 91 { 92 #ifdef HAVE_LIBBLUETOOTH 93 // endpoint_set* set = (endpoint_set*)arg; 94 // struct hci_dev_info di; 95 // memset(&di, 0, sizeof(struct hci_dev_info)); 96 // di.dev_id = dev_id; 97 // if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0; 98 // mac_address mac; 99 // mac.bluetooth( di.bdaddr ); 100 // address_vf vf = mac; 101 // set->add(vf); 86 102 #endif 87 103 return 0; 88 104 } 89 105 90 void AddressDiscovery::discover_bluetooth( endpoint_set& endpoints ) { 91 #ifdef HAVE_LIBBLUETOOTH 92 hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints ); 93 #endif 94 } 95 96 void AddressDiscovery::discover_ip_addresses( endpoint_set& endpoints ) { 97 struct ifaddrs* ifaceBuffer = NULL; 98 void* tmpAddrPtr = NULL; 99 100 int ret = getifaddrs( &ifaceBuffer ); 101 if( ret != 0 ) return; 102 103 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) { 104 105 // ignore devices that are disabled or have no ip 106 if(i == NULL) continue; 107 struct sockaddr* addr = i->ifa_addr; 108 if (addr==NULL) continue; 109 110 // ignore tun devices 111 string device = string(i->ifa_name); 112 if(device.find_first_of("tun") == 0) continue; 113 114 if (addr->sa_family == AF_INET) { 115 // look for ipv4 116 char straddr[INET_ADDRSTRLEN]; 117 tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr; 118 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 119 ip_address ip = straddr; 120 if (ip.is_loopback()) continue; 121 address_vf vf = ip; 122 endpoints.add( vf ); 123 } else 124 if (addr->sa_family == AF_INET6) { 125 // look for ipv6 126 char straddr[INET6_ADDRSTRLEN]; 127 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr; 128 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 129 ip_address ip = straddr; 130 if (ip.is_loopback()) continue; 131 // if (ip.is_link_local()) continue; 132 address_vf vf = ip; 133 endpoints.add( vf ); 134 } 135 } 136 137 freeifaddrs(ifaceBuffer); 138 } 139 140 void AddressDiscovery::discover_endpoints( endpoint_set& endpoints ) { 141 discover_ip_addresses( endpoints ); 142 discover_bluetooth( endpoints ); 106 void discover_bluetooth( 107 EndpointSetPtr listenOn_endpoints, 108 EndpointSetPtr discovered_endpoints ) 109 { 110 #ifdef HAVE_LIBBLUETOOTH 111 // FIXME aktuell bluetooth 112 // hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints ); 113 #endif 114 } 115 116 void discover_ip_addresses( 117 EndpointSetPtr listenOn_endpoints, 118 EndpointSetPtr discovered_endpoints ) 119 { 120 bool discover_ipv4 = false; 121 bool discover_ipv6 = false; 122 vector<uint16_t> ipv4_ports; 123 vector<uint16_t> ipv6_ports; 124 125 /* analyze listenOn_endpoints */ 126 BOOST_FOREACH( TcpIP_EndpointPtr endp, listenOn_endpoints->get_tcpip_endpoints() ) 127 { 128 // BRANCH: IPv4 any [0.0.0.0] 129 if ( endp->to_asio().address() == address_v4::any() ) 130 { 131 // add port 132 ipv4_ports.push_back(endp->to_asio().port()); 133 134 discover_ipv4 = true; 135 } 136 137 // BRANCH: IPv6 any [::] 138 else if ( endp->to_asio().address() == address_v6::any() ) 139 { 140 // add port 141 ipv6_ports.push_back(endp->to_asio().port()); 142 143 discover_ipv6 = true; 144 145 146 // NOTE: on linux the ipv6-any address [::] catches ipv4 as well 147 ipv4_ports.push_back(endp->to_asio().port()); 148 discover_ipv4 = true; 149 } 150 151 // BRANCH: explicit ip address 152 else 153 { 154 // ---> don't discover anything, just add it directly 155 discovered_endpoints->add_endpoint(endp); 156 } 157 } 158 159 160 /* discover addresses */ 161 if ( discover_ipv4 || discover_ipv6 ) 162 { 163 struct ifaddrs* ifaceBuffer = NULL; 164 void* tmpAddrPtr = NULL; 165 166 int ret = getifaddrs( &ifaceBuffer ); 167 if( ret != 0 ) return; 168 169 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) 170 { 171 // ignore devices that are disabled or have no ip 172 if(i == NULL) continue; 173 struct sockaddr* addr = i->ifa_addr; 174 if (addr==NULL) continue; 175 176 // // ignore tun devices // XXX why? 177 // string device = string(i->ifa_name); 178 // if(device.find_first_of("tun") == 0) continue; 179 180 // IPv4 181 if ( discover_ipv4 && addr->sa_family == AF_INET ) 182 { 183 char straddr[INET_ADDRSTRLEN]; 184 tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr; 185 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 186 187 address ip_addr = address::from_string(straddr); 188 189 // skip loopback address 190 if ( ip_addr.to_v4() == address_v4::loopback() ) 191 continue; 192 193 // add endpoint for this address and every given ipv4 port 194 BOOST_FOREACH( uint16_t port, ipv4_ports ) 195 { 196 tcp::endpoint tcpip_endp(ip_addr, port); 197 TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp)); 198 199 discovered_endpoints->add_endpoint(endp); 200 } 201 } 202 203 // IPv6 204 else if ( discover_ipv6 && addr->sa_family == AF_INET6 ) 205 { 206 // look for ipv6 207 char straddr[INET6_ADDRSTRLEN]; 208 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr; 209 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 210 211 address ip_addr = address::from_string(straddr); 212 213 // skip loopback address 214 if ( ip_addr.to_v6() == address_v6::loopback() ) 215 continue; 216 217 // add endpoint for this address and every given ipv4 port 218 BOOST_FOREACH( uint16_t port, ipv6_ports ) 219 { 220 tcp::endpoint tcpip_endp(ip_addr, port); 221 TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp)); 222 223 discovered_endpoints->add_endpoint(endp); 224 } 225 } 226 } 227 228 freeifaddrs(ifaceBuffer); 229 } 230 } 231 232 233 234 EndpointSetPtr AddressDiscovery::discover_endpoints(EndpointSetPtr listenOn_endpoints) 235 { 236 EndpointSetPtr discovered_endpoints(new addressing2::endpoint_set()); 237 238 discover_ip_addresses( listenOn_endpoints, discovered_endpoints ); 239 discover_bluetooth( listenOn_endpoints, discovered_endpoints ); 240 241 return discovered_endpoints; 143 242 } 144 243 -
source/ariba/communication/networkinfo/AddressDiscovery.h
r5639 r12060 40 40 #define __ADDRESS_DISCOVERY_H 41 41 42 #include "ariba/utility/addressing/addressing.hpp" 43 44 using namespace ariba::addressing; 42 #include "ariba/utility/addressing2/endpoint_set.hpp" 45 43 46 44 namespace ariba { 47 45 namespace communication { 48 46 47 using addressing2::EndpointSetPtr; 48 49 49 class AddressDiscovery { 50 50 public: 51 static void discover_endpoints( endpoint_set& endpoints);51 static EndpointSetPtr discover_endpoints(EndpointSetPtr listenOn_endpoints); 52 52 53 53 private: 54 static mac_address getMacFromIF( const char* name ); 55 static int dev_info(int s, int dev_id, long arg); 56 static void discover_bluetooth( endpoint_set& endpoints ); 57 static void discover_ip_addresses( endpoint_set& endpoints ); 54 // TODO aktuell weg damit.. 55 // static mac_address getMacFromIF( const char* name ); 56 // static int dev_info(int s, int dev_id, long arg); 57 // static void discover_bluetooth( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints ); 58 // static void discover_ip_addresses( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints ); 58 59 }; 59 60
Note:
See TracChangeset
for help on using the changeset viewer.