Changeset 12060 for source/ariba/overlay
- Timestamp:
- Jun 19, 2013, 11:05:49 AM (12 years ago)
- Location:
- source/ariba/overlay
- Files:
-
- 2 added
- 17 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/overlay/BaseOverlay.cpp
r10653 r12060 57 57 #include "ariba/utility/visual/DddVis.h" 58 58 #include "ariba/utility/visual/ServerVis.h" 59 #include <ariba/utility/misc/sha1.h> 59 60 60 61 namespace ariba { 61 62 namespace overlay { 63 64 using namespace std; 65 using ariba::transport::system_priority; 62 66 63 67 #define visualInstance ariba::utility::DddVis::instance() 64 68 #define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY 65 69 #define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION 70 71 72 // time constants (in seconds) 73 #define KEEP_ALIVE_TIME 60 // send keep-alive message after link is not used for #s 74 75 #define LINK_ESTABLISH_TIME_OUT 10 // timeout: link requested but not up 76 #define KEEP_ALIVE_TIME_OUT KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT // timeout: no data received on this link (incl. keep-alive messages) 77 #define AUTO_LINK_TIME_OUT KEEP_ALIVE_TIME_OUT // timeout: auto link not used for #s 66 78 67 79 … … 85 97 86 98 LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) { 87 BOOST_FOREACH( LinkDescriptor* lp, links )99 foreach( LinkDescriptor* lp, links ) 88 100 if ((communication ? lp->communicationId : lp->overlayId) == link) 89 101 return lp; … … 92 104 93 105 const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const { 94 BOOST_FOREACH( const LinkDescriptor* lp, links )106 foreach( const LinkDescriptor* lp, links ) 95 107 if ((communication ? lp->communicationId : lp->overlayId) == link) 96 108 return lp; … … 122 134 123 135 /// returns a auto-link descriptor 124 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) { 136 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) 137 { 125 138 // search for a descriptor that is already up 126 BOOST_FOREACH( LinkDescriptor* lp, links ) 127 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0) 128 return lp; 139 foreach( LinkDescriptor* lp, links ) 140 { 141 if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) ) 142 return lp; 143 } 144 129 145 // if not found, search for one that is about to come up... 130 BOOST_FOREACH( LinkDescriptor* lp, links ) 131 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 ) 132 return lp; 146 foreach( LinkDescriptor* lp, links ) 147 { 148 time_t now = time(NULL); 149 150 if (lp->autolink && lp->remoteNode == node && lp->service == service 151 && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT ) 152 return lp; 153 } 154 133 155 return NULL; 134 156 } … … 136 158 /// stabilizes link information 137 159 void BaseOverlay::stabilizeLinks() { 138 // send keep-alive messages over established links 139 BOOST_FOREACH( LinkDescriptor* ld, links ) { 160 time_t now = time(NULL); 161 162 // send keep-alive messages over established links 163 foreach( LinkDescriptor* ld, links ) 164 { 140 165 if (!ld->up) continue; 141 OverlayMsg msg( OverlayMsg::typeLinkAlive, 142 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 143 if (ld->relayed) msg.setRouteRecord(true); 144 send_link( &msg, ld->overlayId ); 166 167 if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME ) 168 { 169 logging_debug("[BaseOverlay] Sending KeepAlive over " 170 << ld->to_string() 171 << " after " 172 << difftime( now, ld->keepAliveSent ) 173 << "s"); 174 175 OverlayMsg msg( OverlayMsg::typeKeepAlive, 176 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 177 msg.setRouteRecord(true); 178 ld->keepAliveSent = now; 179 send_link( &msg, ld->overlayId, system_priority::OVERLAY ); 180 } 145 181 } 146 182 147 183 // iterate over all links and check for time boundaries 148 184 vector<LinkDescriptor*> oldlinks; 149 time_t now = time(NULL); 150 BOOST_FOREACH( LinkDescriptor* ld, links ) { 151 152 // keep alives and not up? yes-> link connection request is stale! 153 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) { 154 155 // increase counter 156 ld->keepAliveMissed++; 157 158 // missed more than four keep-alive messages (10 sec)? -> drop link 159 if (ld->keepAliveMissed > 4) { 160 logging_info( "Link connection request is stale, closing: " << ld ); 161 oldlinks.push_back( ld ); 162 continue; 163 } 185 foreach( LinkDescriptor* ld, links ) { 186 187 // link connection request stale? 188 if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT ) // NOTE: keepAliveReceived == now, on connection request 189 { 190 logging_info( "Link connection request is stale, closing: " << ld ); 191 ld->failed = true; 192 oldlinks.push_back( ld ); 193 continue; 164 194 } 165 195 166 196 if (!ld->up) continue; 167 197 198 199 200 168 201 // check if link is relayed and retry connecting directly 202 // TODO Mario: What happens here? --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply 169 203 if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) { 170 204 ld->retryCounter--; … … 173 207 174 208 // remote used as relay flag 175 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)209 if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT) // TODO is this a reasonable timeout ?? 176 210 ld->relaying = false; 177 211 … … 183 217 184 218 // auto-link time exceeded? 185 if ( ld->autolink && difftime( now, ld->lastuse ) > 30) {219 if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) { 186 220 oldlinks.push_back( ld ); 187 221 continue; … … 189 223 190 224 // keep alives missed? yes-> 191 if ( difftime( now, ld->keepAliveTime ) > 4 ) { 192 193 // increase counter 194 ld->keepAliveMissed++; 195 196 // missed more than four keep-alive messages (4 sec)? -> drop link 197 if (ld->keepAliveMissed >= 2) { 198 logging_info( "Link is stale, closing: " << ld ); 199 oldlinks.push_back( ld ); 200 continue; 201 } 225 if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT ) 226 { 227 logging_info( "Link is stale, closing: " << ld ); 228 ld->failed = true; 229 oldlinks.push_back( ld ); 230 continue; 202 231 } 203 232 } 204 233 205 234 // drop links 206 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {235 foreach( LinkDescriptor* ld, oldlinks ) { 207 236 logging_info( "Link timed out. Dropping " << ld ); 208 237 ld->relaying = false; … … 210 239 } 211 240 212 // show link state 213 counter++; 214 if (counter>=4) showLinks(); 215 if (counter>=4 || counter<0) counter = 0; 241 242 243 244 // show link state (debug output) 245 if (counter>=10 || counter<0) 246 { 247 showLinks(); 248 counter = 0; 249 } 250 else 251 { 252 counter++; 253 } 216 254 } 217 255 … … 230 268 231 269 int i=0; 232 BOOST_FOREACH( LinkDescriptor* ld, links ) {233 if (! ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;270 foreach( LinkDescriptor* ld, links ) { 271 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue; 234 272 bool found = false; 235 BOOST_FOREACH(NodeID& id, nodes)273 foreach(NodeID& id, nodes) 236 274 if (id == ld->remoteNode) found = true; 237 275 if (found) continue; … … 261 299 int i=0; 262 300 logging_info("--- link state -------------------------------"); 263 BOOST_FOREACH( LinkDescriptor* ld, links ) {301 foreach( LinkDescriptor* ld, links ) { 264 302 string epd = ""; 265 if (ld->isDirectVital()) 266 epd = getEndpointDescriptor(ld->remoteNode).toString(); 303 if (isLinkDirectVital(ld)) 304 { 305 // epd = getEndpointDescriptor(ld->remoteNode).toString(); 306 307 epd = "Connection: "; 308 epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string(); 309 epd += " <---> "; 310 epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string(); 311 } 267 312 268 313 logging_info("LINK_STATE: " << i << ": " << ld << " " << epd); … … 289 334 // internal message delivery --------------------------------------------------- 290 335 336 337 seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority ) 338 { 339 // set priority 340 message->setPriority(priority); 341 342 // wrap old OverlayMsg into reboost message 343 reboost::message_t wrapped_message = message->wrap_up_for_sending(); 344 345 // send down to BaseCommunication 346 try 347 { 348 // * send * 349 return bc->sendMessage(bc_link, wrapped_message, priority, false); 350 } 351 catch ( communication::communication_message_not_sent& e ) 352 { 353 ostringstream out; 354 out << "Communication message not sent: " << e.what(); 355 throw message_not_sent(out.str()); 356 } 357 358 throw logic_error("This should never happen!"); 359 } 360 361 291 362 /// routes a message to its destination node 292 void BaseOverlay::route( OverlayMsg* message ) {293 363 void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop ) 364 { 294 365 // exceeded time-to-live? yes-> drop message 295 if (message->getNumHops() > message->getTimeToLive()) { 296 logging_warn("Message exceeded TTL. Dropping message and relay routes" 297 "for recovery."); 366 if (message->getNumHops() > message->getTimeToLive()) 367 { 368 logging_warn("Message exceeded TTL. Dropping message and relay routes " 369 << "for recovery. Hop count: " << (int) message->getNumHops()); 298 370 removeRelayNode(message->getDestinationNode()); 299 371 return; … … 301 373 302 374 // no-> forward message 303 else { 375 else 376 { 304 377 // destinastion myself? yes-> handle message 305 if (message->getDestinationNode() == nodeId) { 306 logging_warn("Usually I should not route messages to myself!"); 307 Message msg; 308 msg.encapsulate(message); 309 handleMessage( &msg, NULL ); 310 } else { 311 // no->send message to next hop 312 send( message, message->getDestinationNode() ); 313 } 314 } 378 if (message->getDestinationNode() == nodeId) 379 { 380 logging_warn("Usually I should not route messages to myself. And I won't!"); 381 } 382 383 // no->send message to next hop 384 else 385 { 386 try 387 { 388 /* (deep) packet inspection to determine priority */ 389 // BRANCH: typeData --> send with low priority 390 if ( message->getType() == OverlayMsg::typeData ) 391 { 392 // TODO think about implementing explicit routing queue (with active queue management??) 393 send( message, 394 message->getDestinationNode(), 395 message->getPriority(), 396 last_hop ); 397 } 398 // BRANCH: internal message --> send with higher priority 399 else 400 { 401 send( message, 402 message->getDestinationNode(), 403 system_priority::HIGH, 404 last_hop ); 405 } 406 } 407 catch ( message_not_sent& e ) 408 { 409 logging_warn("Unable to route message of type " 410 << message->getType() 411 << " to " 412 << message->getDestinationNode() 413 << ". Reason: " 414 << e.what()); 415 416 // inform sender 417 if ( message->getType() != OverlayMsg::typeMessageLost ) 418 { 419 report_lost_message(message); 420 } 421 } 422 } 423 } 424 } 425 426 void BaseOverlay::report_lost_message( const OverlayMsg* message ) 427 { 428 OverlayMsg reply(OverlayMsg::typeMessageLost); 429 reply.setSeqNum(message->getSeqNum()); 430 431 /** 432 * MessageLost-Message 433 * 434 * - Type of lost message 435 * - Hop count of lost message 436 * - Source-LinkID of lost message 437 */ 438 reboost::shared_buffer_t b(sizeof(uint8_t)*2); 439 b.mutable_data()[0] = message->getType(); 440 b.mutable_data()[1] = message->getNumHops(); 441 reply.append_buffer(b); 442 reply.append_buffer(message->getSourceLink().serialize()); 443 444 try 445 { 446 send_node(&reply, message->getSourceNode(), 447 system_priority::OVERLAY, 448 OverlayInterface::OVERLAY_SERVICE_ID); 449 } 450 catch ( message_not_sent& e ) 451 { 452 logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too."); 453 } 315 454 } 316 455 317 456 /// sends a message to another node, delivers it to the base overlay class 318 seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) { 457 seqnum_t BaseOverlay::send( OverlayMsg* message, 458 const NodeID& destination, 459 uint8_t priority, 460 const NodeID& last_hop ) throw(message_not_sent) 461 { 319 462 LinkDescriptor* next_link = NULL; 320 463 321 464 // drop messages to unspecified destinations 322 if (destination.isUnspecified()) return -1; 323 324 // send messages to myself -> handle message and drop warning! 325 if (destination == nodeId) { 326 logging_warn("Sent message to myself. Handling message.") 327 Message msg; 328 msg.encapsulate(message); 329 handleMessage( &msg, NULL ); 330 return -1; 465 if (destination.isUnspecified()) 466 throw message_not_sent("No destination specified. Drop!"); 467 468 // send messages to myself -> drop! 469 // TODO maybe this is not what we want. why not just deliver this message? 470 // There is a similar check in the route function, there it should be okay. 471 if (destination == nodeId) 472 { 473 logging_warn("Sent message to myself. Drop!"); 474 475 throw message_not_sent("Sent message to myself. Drop!"); 331 476 } 332 477 333 478 // use relay path? 334 if (message->isRelayed()) { 479 if (message->isRelayed()) 480 { 335 481 next_link = getRelayLinkTo( destination ); 336 if (next_link != NULL) { 482 483 if (next_link != NULL) 484 { 337 485 next_link->setRelaying(); 338 return bc->sendMessage(next_link->communicationId, message); 339 } else { 340 logging_warn("Could not send message. No relay hop found to " 341 << destination << " -- trying to route over overlay paths ...") 342 // logging_error("ERROR: " << debugInformation() ); 343 // return -1; 344 } 345 } 346 486 487 // * send message over relayed link * 488 return send_overlaymessage_down(message, next_link->communicationId, priority); 489 } 490 else 491 { 492 logging_warn("No relay hop found to " << destination 493 << " -- trying to route over overlay paths ...") 494 } 495 } 496 497 347 498 // last resort -> route over overlay path 348 499 LinkID next_id = overlayInterface->getNextLinkId( destination ); 349 if (next_id.isUnspecified()) { 350 logging_warn("Could not send message. No next hop found to " << 351 destination ); 352 logging_error("ERROR: " << debugInformation() ); 353 return -1; 354 } 355 356 // get link descriptor, up and running? yes-> send message 500 if ( next_id.isUnspecified() ) 501 { 502 // apperently we're the closest node --> try second best node 503 // NOTE: This is helpful if our routing table is not up-to-date, but 504 // may lead to circles. So we have to be careful. 505 std::vector<const LinkID*> next_ids = 506 overlayInterface->getSortedLinkIdsTowardsNode( destination ); 507 508 for ( int i = 0; i < next_ids.size(); i++ ) 509 { 510 const LinkID& link = *next_ids[i]; 511 512 if ( ! link.isUnspecified() ) 513 { 514 next_id = link; 515 516 break; 517 } 518 } 519 520 // still no next hop found. drop. 521 if ( next_id.isUnspecified() ) 522 { 523 logging_warn("Could not send message. No next hop found to " << 524 destination ); 525 logging_error("ERROR: " << debugInformation() ); 526 527 throw message_not_sent("No next hop found."); 528 } 529 } 530 531 532 /* get link descriptor, do some checks and send message */ 357 533 next_link = getDescriptor(next_id); 358 if (next_link != NULL && next_link->up) { 359 // send message over relayed link 360 return send(message, next_link); 361 } 362 363 // no-> error, dropping message 364 else { 365 logging_warn("Could not send message. Link not known or up"); 366 logging_error("ERROR: " << debugInformation() ); 367 return -1; 368 } 369 370 // not reached-> fail 371 return -1; 372 } 534 535 // check pointer 536 if ( next_link == NULL ) 537 { 538 // NOTE: this shuldn't happen 539 throw message_not_sent("Could not send message. Link not known."); 540 } 541 542 // avoid circles 543 if ( next_link->remoteNode == last_hop ) 544 { 545 // XXX logging_debug 546 logging_info("Possible next hop would create a circle: " 547 << next_link->remoteNode); 548 549 throw message_not_sent("Could not send message. Possible next hop would create a circle."); 550 } 551 552 // check if link is up 553 if ( ! next_link->up) 554 { 555 logging_warn("Could not send message. Link not up"); 556 logging_error("ERROR: " << debugInformation() ); 557 558 throw message_not_sent("Could not send message. Link not up"); 559 } 560 561 // * send message over overlay link * 562 return send(message, next_link, priority); 563 } 564 373 565 374 566 /// send a message using a link descriptor, delivers it to the base overlay class 375 seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) { 567 seqnum_t BaseOverlay::send( OverlayMsg* message, 568 LinkDescriptor* ldr, 569 uint8_t priority ) throw(message_not_sent) 570 { 376 571 // check if null 377 if (ldr == NULL) { 378 logging_error("Can not send message to " << message->getDestinationAddress()); 379 return -1; 572 if (ldr == NULL) 573 { 574 ostringstream out; 575 out << "Can not send message to " << message->getDestinationAddress(); 576 throw message_not_sent(out.str()); 380 577 } 381 578 382 579 // check if up 383 if ( !ldr->up && !ignore_down) {384 logging_error("Can not send message. Link not up:" << ldr );580 if ( !ldr->up ) 581 { 385 582 logging_error("DEBUG_INFO: " << debugInformation() ); 386 return -1; 387 } 388 LinkDescriptor* ld = NULL; 389 390 // handle relayed link 391 if (ldr->relayed) { 583 584 ostringstream out; 585 out << "Can not send message. Link not up:" << ldr->to_string(); 586 throw message_not_sent(out.str()); 587 } 588 589 LinkDescriptor* next_hop_ld = NULL; 590 591 // BRANCH: relayed link 592 if (ldr->relayed) 593 { 392 594 logging_debug("Resolving direct link for relayed link to " 393 595 << ldr->remoteNode); 394 ld = getRelayLinkTo( ldr->remoteNode ); 395 if (ld==NULL) { 396 logging_error("No relay path found to link " << ldr ); 596 597 next_hop_ld = getRelayLinkTo( ldr->remoteNode ); 598 599 if (next_hop_ld==NULL) 600 { 397 601 logging_error("DEBUG_INFO: " << debugInformation() ); 398 return -1; 399 } 400 ld->setRelaying(); 602 603 ostringstream out; 604 out << "No relay path found to link: " << ldr; 605 throw message_not_sent(out.str()); 606 } 607 608 next_hop_ld->setRelaying(); 401 609 message->setRelayed(true); 402 } else 403 ld = ldr; 404 405 // handle direct link 406 if (ld->communicationUp) { 407 logging_debug("send(): Sending message over direct link."); 408 return bc->sendMessage( ld->communicationId, message ); 409 } else { 410 logging_error("send(): Could not send message. " 411 "Not a relayed link and direct link is not up."); 412 return -1; 413 } 414 return -1; 610 } 611 // BRANCH: direct link 612 else 613 { 614 next_hop_ld = ldr; 615 } 616 617 618 // check next hop-link 619 if ( ! next_hop_ld->communicationUp) 620 { 621 throw message_not_sent( "send(): Could not send message." 622 " Not a relayed link and direct link is not up." ); 623 } 624 625 // send over next link 626 logging_debug("send(): Sending message over direct link."); 627 return send_overlaymessage_down(message, next_hop_ld->communicationId, priority); 628 415 629 } 416 630 417 631 seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote, 418 const ServiceID& service) { 632 uint8_t priority, const ServiceID& service) throw(message_not_sent) 633 { 419 634 message->setSourceNode(nodeId); 420 635 message->setDestinationNode(remote); 421 636 message->setService(service); 422 return send( message, remote ); 423 } 424 425 seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) { 637 return send( message, remote, priority ); 638 } 639 640 void BaseOverlay::send_link( OverlayMsg* message, 641 const LinkID& link, 642 uint8_t priority ) throw(message_not_sent) 643 { 426 644 LinkDescriptor* ld = getDescriptor(link); 427 if (ld==NULL) { 428 logging_error("Cannot find descriptor to link id=" << link.toString()); 429 return -1; 430 } 645 if (ld==NULL) 646 { 647 throw message_not_sent("Cannot find descriptor to link id=" + link.toString()); 648 } 649 431 650 message->setSourceNode(nodeId); 432 651 message->setDestinationNode(ld->remoteNode); … … 437 656 message->setService(ld->service); 438 657 message->setRelayed(ld->relayed); 439 return send( message, ld, ignore_down ); 658 659 660 try 661 { 662 // * send message * 663 send( message, ld, priority ); 664 } 665 catch ( message_not_sent& e ) 666 { 667 // drop failed link 668 ld->failed = true; 669 dropLink(ld->overlayId); 670 } 440 671 } 441 672 … … 451 682 // relay link still used and alive? 452 683 if (ld==NULL 453 || !ld->isDirectVital() 454 || difftime(route.used, time(NULL)) > 8) { 684 || !isLinkDirectVital(ld) 685 || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT) // TODO this was set to 8 before.. Is the new timeout better? 686 { 455 687 logging_info("Forgetting relay information to node " 456 688 << route.node.toString() ); … … 488 720 if (message->isRelayed()) { 489 721 // try to find source node 490 BOOST_FOREACH( relay_route& route, relay_routes ) {722 foreach( relay_route& route, relay_routes ) { 491 723 // relay route found? yes-> 492 724 if ( route.node == message->getDestinationNode() ) { … … 504 736 505 737 // try to find source node 506 BOOST_FOREACH( relay_route& route, relay_routes ) {738 foreach( relay_route& route, relay_routes ) { 507 739 508 740 // relay route found? yes-> … … 516 748 if (route.hops > message->getNumHops() 517 749 || rld == NULL 518 || ! rld->isDirectVital()) {750 || !isLinkDirectVital(ld)) { 519 751 logging_info("Updating relay information to node " 520 752 << route.node.toString() 521 << " reducing to " << message->getNumHops() << " hops.");753 << " reducing to " << (int) message->getNumHops() << " hops."); 522 754 route.hops = message->getNumHops(); 523 755 route.link = ld->overlayId; … … 542 774 LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) { 543 775 // try to find source node 544 BOOST_FOREACH( relay_route& route, relay_routes ) {776 foreach( relay_route& route, relay_routes ) { 545 777 if (route.node == remote ) { 546 778 LinkDescriptor* ld = getDescriptor( route.link ); 547 if (ld==NULL || ! ld->isDirectVital()) return NULL; else {779 if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else { 548 780 route.used = time(NULL); 549 781 return ld; … … 575 807 // ---------------------------------------------------------------------------- 576 808 577 void BaseOverlay::start( BaseCommunication &_basecomm, const NodeID& _nodeid ) {809 void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) { 578 810 logging_info("Starting..."); 579 811 580 812 // set parameters 581 bc = &_basecomm;813 bc = _basecomm; 582 814 nodeId = _nodeid; 583 815 … … 587 819 588 820 // timer for auto link management 589 Timer::setInterval( 1000 ); 821 Timer::setInterval( 1000 ); // XXX 822 // Timer::setInterval( 10000 ); 590 823 Timer::start(); 591 824 … … 641 874 overlayInterface->joinOverlay(); 642 875 state = BaseOverlayStateCompleted; 643 BOOST_FOREACH( NodeListener* i, nodeListeners )876 foreach( NodeListener* i, nodeListeners ) 644 877 i->onJoinCompleted( spovnetId ); 645 878 … … 682 915 // gather all service links 683 916 vector<LinkID> servicelinks; 684 BOOST_FOREACH( LinkDescriptor* ld, links ) { 917 foreach( LinkDescriptor* ld, links ) 918 { 685 919 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID ) 686 920 servicelinks.push_back( ld->overlayId ); … … 688 922 689 923 // drop all service links 690 BOOST_FOREACH( LinkID lnk, servicelinks ) 691 dropLink( lnk ); 924 foreach( LinkID lnk, servicelinks ) 925 { 926 logging_debug("Dropping service link " << lnk.toString()); 927 dropLink( lnk ); 928 } 692 929 693 930 // let the node leave the spovnet overlay interface 694 931 logging_debug( "Leaving overlay" ); 695 932 if( overlayInterface != NULL ) 933 { 696 934 overlayInterface->leaveOverlay(); 935 } 697 936 698 937 // drop still open bootstrap links 699 BOOST_FOREACH( LinkID lnk, bootstrapLinks ) 700 bc->dropLink( lnk ); 938 foreach( LinkID lnk, bootstrapLinks ) 939 { 940 logging_debug("Dropping bootstrap link " << lnk.toString()); 941 bc->dropLink( lnk ); 942 } 701 943 702 944 // change to inalid state … … 708 950 709 951 // inform all registered services of the event 710 BOOST_FOREACH( NodeListener* i, nodeListeners ) { 952 foreach( NodeListener* i, nodeListeners ) 953 { 711 954 if( ret ) i->onLeaveCompleted( spovnetId ); 712 955 else i->onLeaveFailed( spovnetId ); … … 731 974 state = BaseOverlayStateInvalid; 732 975 733 BOOST_FOREACH( NodeListener* i, nodeListeners )976 foreach( NodeListener* i, nodeListeners ) 734 977 i->onJoinFailed( spovnetId ); 735 978 … … 783 1026 const ServiceID& service ) { 784 1027 1028 // TODO What if we already have a Link to this node and this service id? 1029 785 1030 // do not establish a link to myself! 786 if (remote == nodeId) return LinkID::UNSPECIFIED; 787 1031 if (remote == nodeId) return 1032 LinkID::UNSPECIFIED; 1033 1034 788 1035 // create a link descriptor 789 1036 LinkDescriptor* ld = addDescriptor(); … … 792 1039 ld->service = service; 793 1040 ld->listener = getListener(ld->service); 1041 1042 // initialize sequence numbers 1043 ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short(); 1044 logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum); 794 1045 795 1046 // create link request message … … 800 1051 msg.setRelayed(true); 801 1052 msg.setRegisterRelay(true); 1053 // msg.setRouteRecord(true); 1054 1055 msg.setSeqNum(ld->last_sent_seqnum); 802 1056 803 1057 // debug message … … 809 1063 ); 810 1064 1065 811 1066 // sending message to node 812 send_node( &msg, ld->remoteNode, ld->service ); 813 1067 try 1068 { 1069 // * send * 1070 seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service ); 1071 } 1072 catch ( message_not_sent& e ) 1073 { 1074 logging_warn("Link request not sent: " << e.what()); 1075 1076 // Message not sent. Cancel link request. 1077 SystemQueue::instance().scheduleCall( 1078 boost::bind( 1079 &BaseOverlay::__onLinkEstablishmentFailed, 1080 this, 1081 ld->overlayId) 1082 ); 1083 } 1084 814 1085 return ld->overlayId; 815 1086 } 816 1087 1088 /// NOTE: "id" is an Overlay-LinkID 1089 void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id) 1090 { 1091 // TODO This code redundant. But also it's not easy to aggregate in one function. 1092 1093 // get descriptor for link 1094 LinkDescriptor* ld = getDescriptor(id, false); 1095 if ( ld == NULL ) return; // not found? ->ignore! 1096 1097 logging_debug( "__onLinkEstablishmentFaild: " << ld ); 1098 1099 // removing relay link information 1100 removeRelayLink(ld->overlayId); 1101 1102 // inform listeners about link down 1103 ld->communicationUp = false; 1104 if (!ld->service.isUnspecified()) 1105 { 1106 CommunicationListener* lst = getListener(ld->service); 1107 if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode ); 1108 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1109 } 1110 1111 // delete all queued messages (auto links) 1112 if( ld->messageQueue.size() > 0 ) { 1113 logging_warn( "Dropping link " << id.toString() << " that has " 1114 << ld->messageQueue.size() << " waiting messages" ); 1115 ld->flushQueue(); 1116 } 1117 1118 // erase mapping 1119 eraseDescriptor(ld->overlayId); 1120 } 1121 1122 817 1123 /// drops an established link 818 void BaseOverlay::dropLink(const LinkID& link) { 819 logging_info( "Dropping link (initiated locally):" << link.toString() ); 1124 void BaseOverlay::dropLink(const LinkID& link) 1125 { 1126 logging_info( "Dropping link: " << link.toString() ); 820 1127 821 1128 // find the link item to drop 822 1129 LinkDescriptor* ld = getDescriptor(link); 823 if( ld == NULL ) { 1130 if( ld == NULL ) 1131 { 824 1132 logging_warn( "Can't drop link, link is unknown!"); 825 1133 return; … … 827 1135 828 1136 // delete all queued messages 829 if( ld->messageQueue.size() > 0 ) { 1137 if( ld->messageQueue.size() > 0 ) 1138 { 830 1139 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has " 831 1140 << ld->messageQueue.size() << " waiting messages" ); 832 1141 ld->flushQueue(); 833 1142 } 834 835 // inform sideport and listener 836 if(ld->listener != NULL) 837 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); 838 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId ); 839 840 // do not drop relay links 841 if (!ld->relaying) { 842 // drop the link in base communication 843 if (ld->communicationUp) bc->dropLink( ld->communicationId ); 844 845 // erase descriptor 846 eraseDescriptor( ld->overlayId ); 847 } else { 848 ld->dropAfterRelaying = true; 849 } 1143 1144 1145 // inform application and remote note (but only once) 1146 // NOTE: If we initiated the drop, this function is called twice, but on 1147 // the second call, there is noting to do. 1148 if ( ld->up && ! ld->failed ) 1149 { 1150 // inform sideport and listener 1151 if(ld->listener != NULL) 1152 { 1153 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); 1154 } 1155 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId ); 1156 1157 // send link-close to remote node 1158 logging_info("Sending LinkClose message to remote node."); 1159 OverlayMsg close_msg(OverlayMsg::typeLinkClose); 1160 send_link(&close_msg, link, system_priority::OVERLAY); 1161 1162 // deactivate link 1163 ld->up = false; 1164 // ld->closing = true; 1165 } 1166 1167 else if ( ld->failed ) 1168 { 1169 // inform listener 1170 if( ld->listener != NULL ) 1171 { 1172 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 1173 } 1174 1175 ld->up = false; 1176 __removeDroppedLink(ld->overlayId); 1177 } 1178 } 1179 1180 /// called from typeLinkClose-handler 1181 void BaseOverlay::__removeDroppedLink(const LinkID& link) 1182 { 1183 // find the link item to drop 1184 LinkDescriptor* ld = getDescriptor(link); 1185 if( ld == NULL ) 1186 { 1187 return; 1188 } 1189 1190 // do not drop relay links 1191 if (!ld->relaying) 1192 { 1193 // drop the link in base communication 1194 if (ld->communicationUp) 1195 { 1196 bc->dropLink( ld->communicationId ); 1197 } 1198 1199 // erase descriptor 1200 eraseDescriptor( ld->overlayId ); 1201 } 1202 else 1203 { 1204 ld->dropAfterRelaying = true; 1205 } 850 1206 } 851 1207 … … 853 1209 854 1210 /// internal send message, always use this functions to send messages over links 855 seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) { 1211 const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message, 1212 const LinkID& link, 1213 uint8_t priority ) throw(message_not_sent) 1214 { 856 1215 logging_debug( "Sending data message on link " << link.toString() ); 857 1216 858 1217 // get the mapping for this link 859 1218 LinkDescriptor* ld = getDescriptor(link); 860 if( ld == NULL ) { 861 logging_error("Could not send message. " 862 << "Link not found id=" << link.toString()); 863 return -1; 1219 if( ld == NULL ) 1220 { 1221 throw message_not_sent("Could not send message. Link not found id=" + link.toString()); 864 1222 } 865 1223 866 1224 // check if the link is up yet, if its an auto link queue message 867 if( !ld->up ) { 1225 if( !ld->up ) 1226 { 868 1227 ld->setAutoUsed(); 869 if( ld->autolink ) { 1228 if( ld->autolink ) 1229 { 870 1230 logging_info("Auto-link " << link.toString() << " not up, queue message"); 871 Data data = data_serialize( message ); 872 const_cast<Message*>(message)->dropPayload(); 873 ld->messageQueue.push_back( new Message(data) ); 874 } else { 875 logging_error("Link " << link.toString() << " not up, drop message"); 876 } 877 return -1; 878 } 879 880 // compile overlay message (has service and node id) 881 OverlayMsg overmsg( OverlayMsg::typeData ); 882 overmsg.encapsulate( const_cast<Message*>(message) ); 883 884 // send message over relay/direct/overlay 885 return send_link( &overmsg, ld->overlayId ); 886 } 887 888 889 seqnum_t BaseOverlay::sendMessage(const Message* message, 890 const NodeID& node, const ServiceID& service) { 1231 1232 // queue message 1233 LinkDescriptor::message_queue_entry msg; 1234 msg.message = message; 1235 msg.priority = priority; 1236 1237 ld->messageQueue.push_back( msg ); 1238 1239 return SequenceNumber::DISABLED; // TODO what to return if message is queued? 1240 } 1241 else 1242 { 1243 throw message_not_sent("Link " + link.toString() + " not up, drop message"); 1244 } 1245 } 1246 1247 // TODO AKTUELL: sequence numbers 1248 // TODO seqnum on fast path ? 1249 ld->last_sent_seqnum.increment(); 1250 1251 /* choose fast-path for direct links; normal overlay-path otherwise */ 1252 // BRANCH: direct link 1253 if ( ld->communicationUp && !ld->relayed ) 1254 { 1255 // * send down to BaseCommunication * 1256 try 1257 { 1258 bc->sendMessage(ld->communicationId, message, priority, true); 1259 } 1260 catch ( communication::communication_message_not_sent& e ) 1261 { 1262 ostringstream out; 1263 out << "Communication message on fast-path not sent: " << e.what(); 1264 throw message_not_sent(out.str()); 1265 } 1266 } 1267 1268 // BRANCH: use (slow) overlay-path 1269 else 1270 { 1271 // compile overlay message (has service and node id) 1272 OverlayMsg overmsg( OverlayMsg::typeData ); 1273 overmsg.set_payload_message(message); 1274 1275 // set SeqNum 1276 if ( ld->transmit_seqnums ) 1277 { 1278 overmsg.setSeqNum(ld->last_sent_seqnum); 1279 } 1280 logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum()); 1281 1282 // send message over relay/direct/overlay 1283 send_link( &overmsg, ld->overlayId, priority ); 1284 } 1285 1286 // return seqnum 1287 return ld->last_sent_seqnum; 1288 } 1289 1290 1291 const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message, 1292 const NodeID& node, uint8_t priority, const ServiceID& service) { 891 1293 892 1294 // find link for node and service … … 907 1309 if( ld == NULL ) { 908 1310 logging_error( "Failed to establish auto-link."); 909 return -1;1311 throw message_not_sent("Failed to establish auto-link."); 910 1312 } 911 1313 ld->autolink = true; … … 920 1322 921 1323 // send / queue message 922 return sendMessage( message, ld->overlayId );923 } 924 925 926 NodeID BaseOverlay::sendMessageCloserToNodeID( const Message*message,927 const NodeID& address, const ServiceID& service) {1324 return sendMessage( message, ld->overlayId, priority ); 1325 } 1326 1327 1328 NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message, 1329 const NodeID& address, uint8_t priority, const ServiceID& service) { 928 1330 929 1331 if ( overlayInterface->isClosestNodeTo(address) ) … … 936 1338 if ( closest_node != NodeID::UNSPECIFIED ) 937 1339 { 938 se qnum_t seqnum = sendMessage(message, closest_node, service);1340 sendMessage(message, closest_node, priority, service); 939 1341 } 940 1342 941 return closest_node; // XXXreturn seqnum ?? tuple? closest_node via (non const) reference?1343 return closest_node; // return seqnum ?? tuple? closest_node via (non const) reference? 942 1344 } 943 1345 // ---------------------------------------------------------------------------- … … 978 1380 979 1381 // see if we can find the node in our own table 980 BOOST_FOREACH(const LinkDescriptor* ld, links){1382 foreach(const LinkDescriptor* ld, links){ 981 1383 if(ld->remoteNode != node) continue; 982 1384 if(!ld->communicationUp) continue; … … 1079 1481 1080 1482 void BaseOverlay::onLinkUp(const LinkID& id, 1081 const address_v* local, const address_v* remote) { 1483 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 1484 { 1082 1485 logging_debug( "Link up with base communication link id=" << id ); 1083 1486 … … 1085 1488 LinkDescriptor* ld = getDescriptor(id, true); 1086 1489 1087 // handle bootstrap link we initiated1490 // BRANCH: handle bootstrap link we initiated 1088 1491 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){ 1089 1492 logging_info( 1090 1493 "Join has been initiated by me and the link is now up. " << 1494 "LinkID: " << id.toString() << 1091 1495 "Sending out join request for SpoVNet " << spovnetId.toString() 1092 1496 ); … … 1096 1500 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1097 1501 JoinRequest joinRequest( spovnetId, nodeId ); 1098 overlayMsg.encapsulate( &joinRequest ); 1099 bc->sendMessage( id, &overlayMsg ); 1502 overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer()); 1503 1504 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY); 1505 1100 1506 return; 1101 1507 } 1102 1508 1103 // no link found? ->link establishment from remote, add one!1509 // BRANCH: link establishment from remote, add one! 1104 1510 if (ld == NULL) { 1105 1511 ld = addDescriptor( id ); … … 1115 1521 // in this case, do not inform listener, since service it unknown 1116 1522 // -> wait for update message! 1117 1118 // link mapping found? -> send update message with node-id and service id 1119 } else { 1523 } 1524 1525 // BRANCH: We requested this link in the first place 1526 else 1527 { 1120 1528 logging_info( "onLinkUp descriptor (initiated locally):" << ld ); 1121 1529 … … 1126 1534 ld->fromRemote = false; 1127 1535 1128 // if link is a relayed link->convert to direct link 1129 if (ld->relayed) { 1130 logging_info( "Converting to direct link: " << ld ); 1536 // BRANCH: this was a relayed link before --> convert to direct link 1537 // TODO do we really have to send a message here? 1538 if (ld->relayed) 1539 { 1131 1540 ld->up = true; 1132 1541 ld->relayed = false; 1542 logging_info( "Converting to direct link: " << ld ); 1543 1544 // send message 1133 1545 OverlayMsg overMsg( OverlayMsg::typeLinkDirect ); 1134 1546 overMsg.setSourceLink( ld->overlayId ); 1135 1547 overMsg.setDestinationLink( ld->remoteLink ); 1136 send_link( &overMsg, ld->overlayId ); 1137 } else { 1548 send_link( &overMsg, ld->overlayId, system_priority::OVERLAY ); 1549 1550 // inform listener 1551 if( ld->listener != NULL) 1552 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 1553 } 1554 1555 1556 /* NOTE: Chord is opening direct-links in it's setup routine which are 1557 * neither set to "relayed" nor to "up". To activate these links a 1558 * typeLinkUpdate must be sent. 1559 * 1560 * This branch is would also be taken when we had a working link before 1561 * (ld->up == true). I'm not sure if this case does actually happen 1562 * and whether it's tested. 1563 */ 1564 else 1565 { 1138 1566 // note: necessary to validate the link on the remote side! 1139 1567 logging_info( "Sending out update" << … … 1144 1572 // compile and send update message 1145 1573 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate ); 1146 overlayMsg.setSourceLink(ld->overlayId);1147 1574 overlayMsg.setAutoLink( ld->autolink ); 1148 send_link( &overlayMsg, ld->overlayId, true ); 1575 overlayMsg.setSourceNode(nodeId); 1576 overlayMsg.setDestinationNode(ld->remoteNode); 1577 overlayMsg.setSourceLink(ld->overlayId); 1578 overlayMsg.setDestinationLink(ld->remoteLink); 1579 overlayMsg.setService(ld->service); 1580 overlayMsg.setRelayed(false); 1581 1582 // TODO ld->communicationId = id ?? 1583 1584 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY); 1149 1585 } 1150 1586 } … … 1152 1588 1153 1589 void BaseOverlay::onLinkDown(const LinkID& id, 1154 const address_v* local, const address_v* remote) { 1155 1590 const addressing2::EndpointPtr local, 1591 const addressing2::EndpointPtr remote) 1592 { 1156 1593 // erase bootstrap links 1157 1594 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); … … 1185 1622 } 1186 1623 1624 1625 void BaseOverlay::onLinkFail(const LinkID& id, 1626 const addressing2::EndpointPtr local, 1627 const addressing2::EndpointPtr remote) 1628 { 1629 logging_debug( "Link fail with base communication link id=" << id ); 1630 1631 // // erase bootstrap links 1632 // vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); 1633 // if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); 1634 // 1635 // // get descriptor for link 1636 // LinkDescriptor* ld = getDescriptor(id, true); 1637 // if ( ld == NULL ) return; // not found? ->ignore! 1638 // logging_debug( "Link failed id=" << ld->overlayId.toString() ); 1639 // 1640 // // inform listeners 1641 // ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 1642 // sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1643 1644 logging_debug( " ... calling onLinkDown ..." ); 1645 onLinkDown(id, local, remote); 1646 } 1647 1648 1187 1649 void BaseOverlay::onLinkChanged(const LinkID& id, 1188 const address_v* oldlocal, const address_v* newlocal, 1189 const address_v* oldremote, const address_v* newremote) { 1190 1191 // get descriptor for link 1192 LinkDescriptor* ld = getDescriptor(id, true); 1193 if ( ld == NULL ) return; // not found? ->ignore! 1194 logging_debug( "onLinkChanged descriptor: " << ld ); 1195 1196 // inform listeners 1197 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 1198 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1199 1200 // autolinks: refresh timestamp 1201 ld->setAutoUsed(); 1202 } 1203 1204 void BaseOverlay::onLinkFail(const LinkID& id, 1205 const address_v* local, const address_v* remote) { 1206 logging_debug( "Link fail with base communication link id=" << id ); 1207 1208 // erase bootstrap links 1209 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); 1210 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); 1211 1212 // get descriptor for link 1213 LinkDescriptor* ld = getDescriptor(id, true); 1214 if ( ld == NULL ) return; // not found? ->ignore! 1215 logging_debug( "Link failed id=" << ld->overlayId.toString() ); 1216 1217 // inform listeners 1218 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 1219 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1220 } 1221 1222 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local, 1223 const address_v* remote, const QoSParameterSet& qos) { 1224 logging_debug( "Link quality changed with base communication link id=" << id ); 1225 1226 // get descriptor for link 1227 LinkDescriptor* ld = getDescriptor(id, true); 1228 if ( ld == NULL ) return; // not found? ->ignore! 1229 logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); 1230 } 1231 1232 bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local, 1233 const address_v* remote ) { 1650 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 1651 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote) 1652 { 1653 // get descriptor for link 1654 LinkDescriptor* ld = getDescriptor(id, true); 1655 if ( ld == NULL ) return; // not found? ->ignore! 1656 logging_debug( "onLinkChanged descriptor: " << ld ); 1657 1658 // inform listeners 1659 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 1660 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1661 1662 // autolinks: refresh timestamp 1663 ld->setAutoUsed(); 1664 } 1665 1666 //void BaseOverlay::onLinkQoSChanged(const LinkID& id, 1667 // const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 1668 // const QoSParameterSet& qos) 1669 //{ 1670 // logging_debug( "Link quality changed with base communication link id=" << id ); 1671 // 1672 // // get descriptor for link 1673 // LinkDescriptor* ld = getDescriptor(id, true); 1674 // if ( ld == NULL ) return; // not found? ->ignore! 1675 // logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); 1676 //} 1677 1678 bool BaseOverlay::onLinkRequest(const LinkID& id, 1679 const addressing2::EndpointPtr local, 1680 const addressing2::EndpointPtr remote) 1681 { 1234 1682 logging_debug("Accepting link request from " << remote->to_string() ); 1683 1684 // TODO ask application..? 1685 1235 1686 return true; 1236 1687 } 1237 1688 1689 1690 1691 1238 1692 /// handles a message from base communication 1239 bool BaseOverlay::receiveMessage(const Message* message, 1240 const LinkID& link, const NodeID& ) { 1693 bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message, 1694 const LinkID& link, 1695 const NodeID&, 1696 bool bypass_overlay ) 1697 { 1241 1698 // get descriptor for link 1242 1699 LinkDescriptor* ld = getDescriptor( link, true ); 1243 return handleMessage( message, ld, link ); 1700 1701 1702 /* choose fastpath for direct links; normal overlay-path otherwise */ 1703 if ( bypass_overlay && ld ) 1704 { 1705 // message received --> link is alive 1706 ld->keepAliveReceived = time(NULL); 1707 // hop count on this link 1708 ld->hops = 0; 1709 1710 1711 // hand over to CommunicationListener (aka Application) 1712 CommunicationListener* lst = getListener(ld->service); 1713 if ( lst != NULL ) 1714 { 1715 lst->onMessage( 1716 message, 1717 ld->remoteNode, 1718 ld->overlayId, 1719 SequenceNumber::DISABLED, 1720 NULL ); 1721 1722 return true; 1723 } 1724 1725 return false; 1726 } 1727 else 1728 { 1729 return handleMessage( message, ld, link ); 1730 } 1244 1731 } 1245 1732 … … 1247 1734 1248 1735 /// Handle spovnet instance join requests 1249 bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {1250 1736 bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink ) 1737 { 1251 1738 // decapsulate message 1252 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>(); 1739 JoinRequest joinReq; 1740 joinReq.deserialize_from_shared_buffer(message); 1741 1253 1742 logging_info( "Received join request for spovnet " << 1254 joinReq ->getSpoVNetID().toString() );1743 joinReq.getSpoVNetID().toString() ); 1255 1744 1256 1745 // check spovnet id 1257 if( joinReq ->getSpoVNetID() != spovnetId ) {1746 if( joinReq.getSpoVNetID() != spovnetId ) { 1258 1747 logging_error( 1259 1748 "Received join request for spovnet we don't handle " << 1260 joinReq ->getSpoVNetID().toString() );1261 delete joinReq; 1749 joinReq.getSpoVNetID().toString() ); 1750 1262 1751 return false; 1263 1752 } … … 1267 1756 logging_info( "Sending join reply for spovnet " << 1268 1757 spovnetId.toString() << " to node " << 1269 overlayMsg->getSourceNode().toString() <<1758 source.toString() << 1270 1759 ". Result: " << (allow ? "allowed" : "denied") ); 1271 joiningNodes.push_back( overlayMsg->getSourceNode());1760 joiningNodes.push_back( source ); 1272 1761 1273 1762 // return overlay parameters … … 1276 1765 << getEndpointDescriptor().toString() ) 1277 1766 OverlayParameterSet parameters = overlayInterface->getParameters(); 1767 1768 1769 // create JoinReplay Message 1278 1770 OverlayMsg retmsg( OverlayMsg::typeJoinReply, 1279 1771 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1280 JoinReply replyMsg( spovnetId, parameters, 1281 allow, getEndpointDescriptor() ); 1282 retmsg.encapsulate(&replyMsg); 1283 bc->sendMessage( bcLink, &retmsg ); 1284 1285 delete joinReq; 1772 JoinReply replyMsg( spovnetId, parameters, allow ); 1773 retmsg.append_buffer(replyMsg.serialize_into_shared_buffer()); 1774 1775 // XXX This is unlovely clash between the old message system and the new one, 1776 // but a.t.m. we can't migrate everything to the new system at once.. 1777 // ---> Consider the EndpointDescriptor as part of the JoinReply.. 1778 retmsg.append_buffer(getEndpointDescriptor().serialize()); 1779 1780 // * send * 1781 send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY); 1782 1286 1783 return true; 1287 1784 } 1288 1785 1289 1786 /// Handle replies to spovnet instance join requests 1290 bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) { 1787 bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink ) 1788 { 1291 1789 // decapsulate message 1292 1790 logging_debug("received join reply message"); 1293 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>(); 1791 JoinReply replyMsg; 1792 EndpointDescriptor endpoints; 1793 reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message); 1794 buff = endpoints.deserialize(buff); 1294 1795 1295 1796 // correct spovnet? 1296 if( replyMsg ->getSpoVNetID() != spovnetId ) { // no-> fail1797 if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail 1297 1798 logging_error( "Received SpoVNet join reply for " << 1298 replyMsg ->getSpoVNetID().toString() <<1799 replyMsg.getSpoVNetID().toString() << 1299 1800 " != " << spovnetId.toString() ); 1300 delete replyMsg; 1801 1301 1802 return false; 1302 1803 } 1303 1804 1304 1805 // access granted? no -> fail 1305 if( !replyMsg ->getJoinAllowed() ) {1806 if( !replyMsg.getJoinAllowed() ) { 1306 1807 logging_error( "Our join request has been denied" ); 1307 1808 … … 1317 1818 1318 1819 // inform all registered services of the event 1319 BOOST_FOREACH( NodeListener* i, nodeListeners )1820 foreach( NodeListener* i, nodeListeners ) 1320 1821 i->onJoinFailed( spovnetId ); 1321 1822 1322 delete replyMsg;1323 1823 return true; 1324 1824 } … … 1329 1829 1330 1830 logging_debug( "Using bootstrap end-point " 1331 << replyMsg->getBootstrapEndpoint().toString() );1831 << endpoints.toString() ); 1332 1832 1333 1833 // create overlay structure from spovnet parameter set … … 1338 1838 1339 1839 overlayInterface = OverlayFactory::create( 1340 *this, replyMsg ->getParam(), nodeId, this );1840 *this, replyMsg.getParam(), nodeId, this ); 1341 1841 1342 1842 // overlay structure supported? no-> fail! … … 1354 1854 1355 1855 // inform all registered services of the event 1356 BOOST_FOREACH( NodeListener* i, nodeListeners )1856 foreach( NodeListener* i, nodeListeners ) 1357 1857 i->onJoinFailed( spovnetId ); 1358 1858 1359 delete replyMsg;1360 1859 return true; 1361 1860 } … … 1365 1864 overlayInterface->createOverlay(); 1366 1865 1367 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint());1368 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint());1866 overlayInterface->joinOverlay( endpoints ); 1867 overlayBootstrap.recordJoin( endpoints ); 1369 1868 1370 1869 // update ovlvis … … 1372 1871 1373 1872 // inform all registered services of the event 1374 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1375 i->onJoinCompleted( spovnetId ); 1376 1377 delete replyMsg; 1378 1379 } else { 1380 1873 foreach( NodeListener* i, nodeListeners ) 1874 i->onJoinCompleted( spovnetId ); 1875 } 1876 else 1877 { 1381 1878 // this is not the first bootstrap, just join the additional node 1382 1879 logging_debug("not first-time bootstrapping"); 1383 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); 1384 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() ); 1385 1386 delete replyMsg; 1387 1880 overlayInterface->joinOverlay( endpoints ); 1881 overlayBootstrap.recordJoin( endpoints ); 1388 1882 } // if( overlayInterface == NULL ) 1389 1883 … … 1392 1886 1393 1887 1394 bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1888 bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld ) 1889 { 1395 1890 // get service 1396 const ServiceID& service = overlayMsg->getService(); 1891 const ServiceID& service = ld->service; //overlayMsg->getService(); 1892 1397 1893 logging_debug( "Received data for service " << service.toString() 1398 1894 << " on link " << overlayMsg->getDestinationLink().toString() ); … … 1402 1898 if(lst != NULL){ 1403 1899 lst->onMessage( 1404 overlayMsg, 1405 overlayMsg->getSourceNode(), 1406 overlayMsg->getDestinationLink() 1900 message, 1901 // overlayMsg->getSourceNode(), 1902 // overlayMsg->getDestinationLink(), 1903 ld->remoteNode, 1904 ld->overlayId, 1905 overlayMsg->getSeqNum(), 1906 overlayMsg 1407 1907 ); 1408 1908 } … … 1411 1911 } 1412 1912 1913 bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg ) 1914 { 1915 /** 1916 * Deserialize MessageLost-Message 1917 * 1918 * - Type of lost message 1919 * - Hop count of lost message 1920 * - Source-LinkID of lost message 1921 */ 1922 const uint8_t* buff = message(0, sizeof(uint8_t)*2).data(); 1923 uint8_t type = buff[0]; 1924 uint8_t hops = buff[1]; 1925 LinkID linkid; 1926 linkid.deserialize(message(sizeof(uint8_t)*2)); 1927 1928 logging_warn("Node " << msg->getSourceNode() 1929 << " informed us, that our message of type " << (int) type 1930 << " is lost after traveling " << (int) hops << " hops." 1931 << " (LinkID: " << linkid.toString()); 1932 1933 1934 // TODO switch-case ? 1935 1936 // BRANCH: LinkRequest --> link request failed 1937 if ( type == OverlayMsg::typeLinkRequest ) 1938 { 1939 __onLinkEstablishmentFailed(linkid); 1940 } 1941 1942 // BRANCH: Data --> link disrupted. Drop link. 1943 // (We could use something more advanced here. e.g. At least send a 1944 // keep-alive message and wait for a keep-alive reply.) 1945 if ( type == OverlayMsg::typeData ) 1946 { 1947 LinkDescriptor* link_desc = getDescriptor(linkid); 1948 1949 if ( link_desc ) 1950 { 1951 link_desc->failed = true; 1952 } 1953 1954 dropLink(linkid); 1955 } 1956 1957 // BRANCH: ping lost 1958 if ( type == OverlayMsg::typePing ) 1959 { 1960 CommunicationListener* lst = getListener(msg->getService()); 1961 if( lst != NULL ) 1962 { 1963 lst->onPingLost(msg->getSourceNode()); 1964 } 1965 } 1966 1967 return true; 1968 } 1969 1970 bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld ) 1971 { 1972 // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node) 1973 1974 bool send_pong = false; 1975 1976 // inform application and ask permission to send a pong message 1977 CommunicationListener* lst = getListener(overlayMsg->getService()); 1978 if( lst != NULL ) 1979 { 1980 send_pong = lst->onPing(overlayMsg->getSourceNode()); 1981 } 1982 1983 // send pong message if allowed 1984 if ( send_pong ) 1985 { 1986 OverlayMsg pong_msg(OverlayMsg::typePong); 1987 pong_msg.setSeqNum(overlayMsg->getSeqNum()); 1988 1989 // send message 1990 try 1991 { 1992 send_node( &pong_msg, 1993 overlayMsg->getSourceNode(), 1994 system_priority::OVERLAY, 1995 overlayMsg->getService() ); 1996 } 1997 catch ( message_not_sent& e ) 1998 { 1999 logging_info("Could not send Pong-Message to node: " << 2000 overlayMsg->getSourceNode()); 2001 } 2002 } 2003 } 2004 2005 bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld ) 2006 { 2007 // inform application 2008 CommunicationListener* lst = getListener(overlayMsg->getService()); 2009 if( lst != NULL ) 2010 { 2011 lst->onPong(overlayMsg->getSourceNode()); 2012 } 2013 } 1413 2014 1414 2015 bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { … … 1439 2040 overlayMsg->setSourceLink(ld->overlayId); 1440 2041 overlayMsg->setService(ld->service); 1441 send( overlayMsg, ld );2042 send( overlayMsg, ld, system_priority::OVERLAY ); 1442 2043 } 1443 2044 … … 1481 2082 if( ld->messageQueue.size() > 0 ) { 1482 2083 logging_info( "Sending out queued messages on link " << ld ); 1483 BOOST_FOREACH( Message* msg, ld->messageQueue ) { 1484 sendMessage( msg, ld->overlayId ); 1485 delete msg;1486 2084 foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue ) 2085 { 2086 sendMessage( msg.message, ld->overlayId, msg.priority ); 2087 } 1487 2088 ld->messageQueue.clear(); 1488 2089 } … … 1497 2098 /// handle a link request and reply 1498 2099 bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1499 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );1500 2100 1501 2101 //TODO: Check if a request has already been sent using getSourceLink() ... … … 1514 2114 ldn->remoteNode = overlayMsg->getSourceNode(); 1515 2115 ldn->remoteLink = overlayMsg->getSourceLink(); 1516 2116 ldn->hops = overlayMsg->getNumHops(); 2117 2118 // initialize sequence numbers 2119 ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short(); 2120 logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum); 2121 2122 1517 2123 // update time-stamps 1518 2124 ldn->setAlive(); 1519 2125 ldn->setAutoUsed(); 1520 2126 2127 logging_info( "Link request received from node id=" 2128 << overlayMsg->getSourceNode() 2129 << " LINK: " 2130 << ldn); 2131 1521 2132 // create reply message and send back! 1522 2133 overlayMsg->swapRoles(); // swap source/destination 1523 2134 overlayMsg->setType(OverlayMsg::typeLinkReply); 1524 2135 overlayMsg->setSourceLink(ldn->overlayId); 1525 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );1526 2136 overlayMsg->setRelayed(true); 1527 send( overlayMsg, ld ); // send back to link 2137 // overlayMsg->setRouteRecord(true); 2138 overlayMsg->setSeqNum(ld->last_sent_seqnum); 2139 2140 // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!! 2141 // append our endpoints (for creation of a direct link) 2142 overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize()); 2143 2144 send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link 1528 2145 1529 2146 // inform listener … … 1534 2151 } 1535 2152 1536 bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1537 2153 bool BaseOverlay::handleLinkReply( 2154 OverlayMsg* overlayMsg, 2155 reboost::shared_buffer_t sub_message, 2156 LinkDescriptor* ld ) 2157 { 2158 // deserialize EndpointDescriptor 2159 EndpointDescriptor endpoints; 2160 endpoints.deserialize(sub_message); 2161 1538 2162 // find link request 1539 2163 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink()); … … 1554 2178 1555 2179 // debug message 1556 logging_ debug( "Link request reply received. Establishing link"2180 logging_info( "Link request reply received. Establishing link" 1557 2181 << " for service " << overlayMsg->getService().toString() 1558 2182 << " with local id=" << overlayMsg->getDestinationLink() 1559 2183 << " and remote link id=" << overlayMsg->getSourceLink() 1560 << " to " << overlayMsg->getSourceEndpoint().toString() 2184 << " to " << endpoints.toString() 2185 << " hop count: " << overlayMsg->getRouteRecord().size() 1561 2186 ); 1562 2187 … … 1577 2202 logging_info( "Sending out queued messages on link " << 1578 2203 ldn->overlayId.toString() ); 1579 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {1580 sendMessage( msg, ldn->overlayId );1581 delete msg;2204 foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue ) 2205 { 2206 sendMessage( msg.message, ldn->overlayId, msg.priority ); 1582 2207 } 1583 2208 ldn->messageQueue.clear(); … … 1589 2214 // try to replace relay link with direct link 1590 2215 ldn->retryCounter = 3; 1591 ldn->endpoint = overlayMsg->getSourceEndpoint();2216 ldn->endpoint = endpoints; 1592 2217 ldn->communicationId = bc->establishLink( ldn->endpoint ); 1593 2218 … … 1596 2221 1597 2222 /// handle a keep-alive message for a link 1598 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 2223 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) 2224 { 1599 2225 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink()); 1600 if ( rld != NULL ) { 1601 logging_debug("Keep-Alive for " << 1602 overlayMsg->getDestinationLink() ); 2226 2227 if ( rld != NULL ) 2228 { 2229 logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() ); 1603 2230 if (overlayMsg->isRouteRecord()) 2231 { 1604 2232 rld->routeRecord = overlayMsg->getRouteRecord(); 2233 } 2234 2235 // set alive 1605 2236 rld->setAlive(); 2237 2238 2239 /* answer keep alive */ 2240 if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive ) 2241 { 2242 time_t now = time(NULL); 2243 logging_debug("[BaseOverlay] Answering KeepAlive over " 2244 << ld->to_string() 2245 << " after " 2246 << difftime( now, ld->keepAliveSent ) 2247 << "s"); 2248 2249 OverlayMsg msg( OverlayMsg::typeKeepAliveReply, 2250 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 2251 msg.setRouteRecord(true); 2252 ld->keepAliveSent = now; 2253 send_link( &msg, ld->overlayId, system_priority::OVERLAY ); 2254 } 2255 1606 2256 return true; 1607 } else { 1608 logging_error("Keep-Alive for " 2257 } 2258 else 2259 { 2260 logging_error("No Keep-Alive for " 1609 2261 << overlayMsg->getDestinationLink() << ": link unknown." ); 1610 2262 return false; … … 1636 2288 // erase the original descriptor 1637 2289 eraseDescriptor(ld->overlayId); 2290 2291 // inform listener 2292 if( rld->listener != NULL) 2293 rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode ); 2294 1638 2295 return true; 1639 2296 } 1640 2297 1641 2298 /// handles an incoming message 1642 bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld, 1643 const LinkID bcLink ) { 1644 logging_debug( "Handling message: " << message->toString()); 1645 2299 bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld, 2300 const LinkID bcLink ) 2301 { 1646 2302 // decapsulate overlay message 1647 OverlayMsg* overlayMsg = 1648 const_cast<Message*>(message)->decapsulate<OverlayMsg>(); 1649 if( overlayMsg == NULL ) return false; 1650 2303 OverlayMsg* overlayMsg = new OverlayMsg(); 2304 reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message); 2305 2306 // // XXX debug 2307 // logging_info( "Received overlay message." 2308 // << " Hops: " << (int) overlayMsg->getNumHops() 2309 // << " Type: " << (int) overlayMsg->getType() 2310 // << " Payload size: " << sub_buff.size() 2311 // << " SeqNum: " << overlayMsg->getSeqNum() ); 2312 2313 1651 2314 // increase number of hops 1652 2315 overlayMsg->increaseNumHops(); … … 1660 2323 // handle signaling messages (do not route!) 1661 2324 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart && 1662 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) { 1663 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); 2325 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) 2326 { 2327 overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); 1664 2328 delete overlayMsg; 1665 2329 return true; … … 1673 2337 << " to " << overlayMsg->getDestinationNode() 1674 2338 ); 1675 route( overlayMsg ); 2339 2340 // // XXX testing AKTUELL 2341 // logging_info("MARIO: Routing message " 2342 // << " from " << overlayMsg->getSourceNode() 2343 // << " to " << overlayMsg->getDestinationNode() ); 2344 // logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size()); 2345 overlayMsg->append_buffer(sub_buff); 2346 2347 route( overlayMsg, ld->remoteNode ); 1676 2348 delete overlayMsg; 1677 2349 return true; 1678 2350 } 1679 2351 1680 // handle base overlay message 2352 2353 /* handle base overlay message */ 1681 2354 bool ret = false; // return value 1682 switch ( overlayMsg->getType() ) { 1683 1684 // data transport messages 1685 case OverlayMsg::typeData: 1686 ret = handleData(overlayMsg, ld); break; 1687 1688 // overlay setup messages 1689 case OverlayMsg::typeJoinRequest: 1690 ret = handleJoinRequest(overlayMsg, bcLink ); break; 1691 case OverlayMsg::typeJoinReply: 1692 ret = handleJoinReply(overlayMsg, bcLink ); break; 1693 1694 // link specific messages 1695 case OverlayMsg::typeLinkRequest: 1696 ret = handleLinkRequest(overlayMsg, ld ); break; 1697 case OverlayMsg::typeLinkReply: 1698 ret = handleLinkReply(overlayMsg, ld ); break; 1699 case OverlayMsg::typeLinkUpdate: 1700 ret = handleLinkUpdate(overlayMsg, ld ); break; 1701 case OverlayMsg::typeLinkAlive: 1702 ret = handleLinkAlive(overlayMsg, ld ); break; 1703 case OverlayMsg::typeLinkDirect: 1704 ret = handleLinkDirect(overlayMsg, ld ); break; 1705 1706 // handle unknown message type 1707 default: { 1708 logging_error( "received message in invalid state! don't know " << 1709 "what to do with this message of type " << overlayMsg->getType() ); 1710 ret = false; 1711 break; 1712 } 2355 try 2356 { 2357 switch ( overlayMsg->getType() ) 2358 { 2359 // data transport messages 2360 case OverlayMsg::typeData: 2361 { 2362 // NOTE: On relayed links, »ld« does not point to our link, but on the relay link. 2363 LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink()); 2364 2365 if ( ! end_to_end_ld ) 2366 { 2367 logging_warn("Error: Data-Message claims to belong to a link we don't know."); 2368 2369 ret = false; 2370 } 2371 else 2372 { 2373 // message received --> link is alive 2374 end_to_end_ld->keepAliveReceived = time(NULL); 2375 // hop count on this link 2376 end_to_end_ld->hops = overlayMsg->getNumHops(); 2377 2378 // * call handler * 2379 ret = handleData(sub_buff, overlayMsg, end_to_end_ld); 2380 } 2381 2382 break; 2383 } 2384 case OverlayMsg::typeMessageLost: 2385 ret = handleLostMessage(sub_buff, overlayMsg); 2386 2387 break; 2388 2389 // overlay setup messages 2390 case OverlayMsg::typeJoinRequest: 2391 ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink ); break; 2392 case OverlayMsg::typeJoinReply: 2393 ret = handleJoinReply(sub_buff, bcLink ); break; 2394 2395 // link specific messages 2396 case OverlayMsg::typeLinkRequest: 2397 ret = handleLinkRequest(overlayMsg, ld ); break; 2398 case OverlayMsg::typeLinkReply: 2399 ret = handleLinkReply(overlayMsg, sub_buff, ld ); break; 2400 case OverlayMsg::typeLinkUpdate: 2401 ret = handleLinkUpdate(overlayMsg, ld ); break; 2402 case OverlayMsg::typeKeepAlive: 2403 case OverlayMsg::typeKeepAliveReply: 2404 ret = handleLinkAlive(overlayMsg, ld ); break; 2405 case OverlayMsg::typeLinkDirect: 2406 ret = handleLinkDirect(overlayMsg, ld ); break; 2407 2408 case OverlayMsg::typeLinkClose: 2409 { 2410 dropLink(overlayMsg->getDestinationLink()); 2411 __removeDroppedLink(overlayMsg->getDestinationLink()); 2412 2413 break; 2414 } 2415 2416 /// ping over overlay path (or similar) 2417 case OverlayMsg::typePing: 2418 { 2419 ret = handlePing(overlayMsg, ld); 2420 break; 2421 } 2422 case OverlayMsg::typePong: 2423 { 2424 ret = handlePong(overlayMsg, ld); 2425 break; 2426 } 2427 2428 // handle unknown message type 2429 default: 2430 { 2431 logging_error( "received message in invalid state! don't know " << 2432 "what to do with this message of type " << overlayMsg->getType() ); 2433 ret = false; 2434 break; 2435 } 2436 } 2437 } 2438 catch ( reboost::illegal_sub_buffer& e ) 2439 { 2440 logging_error( "Failed to create sub-buffer while reading message: »" 2441 << e.what() 2442 << "« Message too short? "); 2443 2444 assert(false); // XXX 1713 2445 } 1714 2446 … … 1720 2452 // ---------------------------------------------------------------------------- 1721 2453 1722 void BaseOverlay::broadcastMessage( Message* message, const ServiceID& service) {2454 void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) { 1723 2455 1724 2456 logging_debug( "broadcasting message to all known nodes " << 1725 2457 "in the overlay from service " + service.toString() ); 1726 1727 if(message == NULL) return;1728 message->setReleasePayload(false);1729 2458 1730 2459 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true); … … 1732 2461 NodeID& id = nodes.at(i); 1733 2462 if(id == this->nodeId) continue; // don't send to ourselfs 1734 if(i+1 == nodes.size()) message->setReleasePayload(true); // release payload on last send 1735 sendMessage( message, id, service );2463 2464 sendMessage( message, id, priority, service ); 1736 2465 } 1737 2466 } … … 1755 2484 vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const { 1756 2485 vector<LinkID> linkvector; 1757 BOOST_FOREACH( LinkDescriptor* ld, links ) {2486 foreach( LinkDescriptor* ld, links ) { 1758 2487 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) { 1759 2488 linkvector.push_back( ld->overlayId ); … … 1779 2508 updateVisual(); 1780 2509 } 2510 2511 2512 2513 /* link status */ 2514 bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const 2515 { 2516 const LinkDescriptor* ld = getDescriptor(lnk); 2517 2518 if (!ld) 2519 return false; 2520 2521 return ld->communicationUp && !ld->relayed; 2522 } 2523 2524 int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const 2525 { 2526 const LinkDescriptor* ld = getDescriptor(lnk); 2527 2528 if (!ld) 2529 return -1; 2530 2531 return ld->hops; 2532 } 2533 2534 2535 bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const 2536 { 2537 time_t now = time(NULL); 2538 2539 return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..? 2540 } 2541 2542 bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const 2543 { 2544 return isLinkVital(link) && link->communicationUp && !link->relayed; 2545 } 2546 2547 /* [link status] */ 2548 1781 2549 1782 2550 void BaseOverlay::updateVisual(){ … … 1878 2646 static set<NodeID> linkset; 1879 2647 set<NodeID> remotenodes; 1880 BOOST_FOREACH( LinkDescriptor* ld, links ) {1881 if (! ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)2648 foreach( LinkDescriptor* ld, links ) { 2649 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) 1882 2650 continue; 1883 2651 … … 1895 2663 do{ 1896 2664 changed = false; 1897 BOOST_FOREACH(NodeID n, linkset){2665 foreach(NodeID n, linkset){ 1898 2666 if(remotenodes.find(n) == remotenodes.end()){ 1899 2667 visualInstance.visDisconnect(visualIdBase, this->nodeId, n, ""); … … 1908 2676 do{ 1909 2677 changed = false; 1910 BOOST_FOREACH(NodeID n, remotenodes){2678 foreach(NodeID n, remotenodes){ 1911 2679 if(linkset.find(n) == linkset.end()){ 1912 2680 visualInstance.visConnect(visualIdBase, this->nodeId, n, ""); … … 1933 2701 // dump link state 1934 2702 s << "--- link state -------------------------------" << endl; 1935 BOOST_FOREACH( LinkDescriptor* ld, links ) {2703 foreach( LinkDescriptor* ld, links ) { 1936 2704 s << "link " << i << ": " << ld << endl; 1937 2705 i++; -
source/ariba/overlay/BaseOverlay.h
r10653 r12060 47 47 #include <vector> 48 48 #include <deque> 49 #include <stdexcept> 49 50 #include <boost/foreach.hpp> 51 52 #ifdef ECLIPSE_PARSER 53 #define foreach(a, b) for(a : b) 54 #else 55 #define foreach(a, b) BOOST_FOREACH(a, b) 56 #endif 50 57 51 58 #include "ariba/utility/messages.h" … … 64 71 #include "ariba/overlay/modules/OverlayStructureEvents.h" 65 72 #include "ariba/overlay/OverlayBootstrap.h" 73 #include "ariba/overlay/SequenceNumber.h" 66 74 67 75 // forward declarations … … 92 100 using ariba::communication::BaseCommunication; 93 101 using ariba::communication::CommunicationEvents; 102 103 // transport 104 //using ariba::transport::system_priority; 94 105 95 106 // utilities … … 103 114 using ariba::utility::Demultiplexer; 104 115 using ariba::utility::MessageReceiver; 105 using ariba::utility::MessageSender;106 116 using ariba::utility::seqnum_t; 107 117 using ariba::utility::Timer; … … 110 120 namespace overlay { 111 121 112 using namespace ariba::addressing; 122 123 124 class message_not_sent: public std::runtime_error 125 { 126 public: 127 /** Takes a character string describing the error. */ 128 explicit message_not_sent(const string& __arg) : 129 std::runtime_error(__arg) 130 { 131 } 132 133 virtual ~message_not_sent() throw() {} 134 }; 135 136 113 137 114 138 class LinkDescriptor; … … 121 145 protected Timer { 122 146 123 friend class OneHop; 147 // friend class OneHop; // DEPRECATED 124 148 friend class Chord; 125 149 friend class ariba::SideportListener; … … 128 152 129 153 public: 130 131 154 /** 132 155 * Constructs an empty non-functional base overlay instance … … 142 165 * Starts the Base Overlay instance 143 166 */ 144 void start(BaseCommunication &_basecomm, const NodeID& _nodeid);167 void start(BaseCommunication* _basecomm, const NodeID& _nodeid); 145 168 146 169 /** … … 161 184 * Starts a link establishment procedure to the specfied node 162 185 * for the service with id service 163 * 186 * 164 187 * @param node Destination node id 165 188 * @param service Service to connect to … … 179 202 void dropLink( const LinkID& link ); 180 203 204 205 206 /* +++++ Message sending +++++ */ 207 208 181 209 /// sends a message over an existing link 182 seqnum_t sendMessage(const Message* message, const LinkID& link ); 210 const SequenceNumber& sendMessage(reboost::message_t message, 211 const LinkID& link, 212 uint8_t priority ) throw(message_not_sent); 183 213 184 214 /// sends a message to a node and a specific service 185 seqnum_t sendMessage(const Message* message, const NodeID& remote, 186 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 215 const SequenceNumber& sendMessage(reboost::message_t message, 216 const NodeID& remote, 217 uint8_t priority, 218 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 219 187 220 188 221 /** … … 191 224 * @return NodeID of the (closest) destination node; 192 225 */ 193 NodeID sendMessageCloserToNodeID( const Message*message, const NodeID& address,194 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID);226 NodeID sendMessageCloserToNodeID(reboost::message_t message, const NodeID& address, 227 uint8_t priority, const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 195 228 196 229 /** … … 198 231 * Depending on the structure of the overlay, this can be very different. 199 232 */ 200 void broadcastMessage(Message* message, const ServiceID& service); 201 233 void broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority); 234 235 236 /* +++++ [Message sending] +++++ */ 237 238 239 202 240 /** 203 241 * Returns the end-point descriptor of a link. … … 294 332 */ 295 333 void leaveSpoVNet(); 334 335 336 /* link status */ 337 bool isLinkDirect(const ariba::LinkID& lnk) const; 338 int getHopCount(const ariba::LinkID& lnk) const; 339 340 bool isLinkVital(const LinkDescriptor* link) const; 341 bool isLinkDirectVital(const LinkDescriptor* link) const; 296 342 297 343 protected: 298 /** 299 * @see ariba::communication::CommunicationEvents.h 300 */ 301 virtual void onLinkUp(const LinkID& id, const address_v* local, 302 const address_v* remote); 303 304 /** 305 * @see ariba::communication::CommunicationEvents.h 306 */ 307 virtual void onLinkDown(const LinkID& id, const address_v* local, 308 const address_v* remote); 309 310 /** 311 * @see ariba::communication::CommunicationEvents.h 312 */ 313 virtual void onLinkChanged(const LinkID& id, 314 const address_v* oldlocal, const address_v* newlocal, 315 const address_v* oldremote, const address_v* newremote); 316 317 /** 318 * @see ariba::communication::CommunicationEvents.h 319 */ 320 virtual void onLinkFail(const LinkID& id, const address_v* local, 321 const address_v* remote); 322 323 /** 324 * @see ariba::communication::CommunicationEvents.h 325 */ 326 virtual void onLinkQoSChanged(const LinkID& id, 327 const address_v* local, const address_v* remote, 328 const QoSParameterSet& qos); 329 330 /** 331 * @see ariba::communication::CommunicationEvents.h 332 */ 333 virtual bool onLinkRequest(const LinkID& id, const address_v* local, 334 const address_v* remote); 335 344 345 /** 346 * @see ariba::communication::CommunicationEvents.h 347 */ 348 virtual bool onLinkRequest(const LinkID& id, 349 const addressing2::EndpointPtr local, 350 const addressing2::EndpointPtr remote); 351 352 /** 353 * @see ariba::communication::CommunicationEvents.h 354 */ 355 virtual void onLinkUp(const LinkID& id, 356 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 357 358 /** 359 * @see ariba::communication::CommunicationEvents.h 360 */ 361 virtual void onLinkDown(const LinkID& id, 362 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 363 364 /** 365 * @see ariba::communication::CommunicationEvents.h 366 */ 367 virtual void onLinkChanged(const LinkID& id, 368 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 369 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote); 370 371 /** 372 * @see ariba::communication::CommunicationEvents.h 373 * 374 * NOTE: Just calls onLinkDown (at the moment..) 375 */ 376 virtual void onLinkFail(const LinkID& id, 377 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 378 379 /** 380 * @see ariba::communication::CommunicationEvents.h 381 */ 382 // virtual void onLinkQoSChanged(const LinkID& id, 383 // const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 384 // const QoSParameterSet& qos); 385 386 387 388 336 389 /** 337 390 * Processes a received message from BaseCommunication … … 340 393 * the node the message came from! 341 394 */ 342 virtual bool receiveMessage( const Message* message, const LinkID& link, 343 const NodeID& ); 395 virtual bool receiveMessage( reboost::shared_buffer_t message, 396 const LinkID& link, 397 const NodeID&, 398 bool bypass_overlay ); 344 399 345 400 /** … … 359 414 std::string getLinkHTMLInfo(); 360 415 416 417 private: 418 /// NOTE: "id" is an Overlay-LinkID 419 void __onLinkEstablishmentFailed(const LinkID& id); 420 421 /// called from typeLinkClose-handler 422 void __removeDroppedLink(const LinkID& link); 423 361 424 private: 362 425 /// is the base overlay started yet … … 396 459 397 460 /// demultiplexes a incoming message with link descriptor 398 bool handleMessage( const Message*message, LinkDescriptor* ld,461 bool handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld, 399 462 const LinkID bcLink = LinkID::UNSPECIFIED ); 400 463 401 464 // handle data and signalling messages 402 bool handleData( OverlayMsg* msg, LinkDescriptor* ld ); 465 bool handleData( reboost::shared_buffer_t message, OverlayMsg* msg, LinkDescriptor* ld ); 466 bool handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg ); 403 467 bool handleSignaling( OverlayMsg* msg, LinkDescriptor* ld ); 404 468 405 469 // handle join request / reply messages 406 bool handleJoinRequest( OverlayMsg* msg, const LinkID& bcLink );407 bool handleJoinReply( OverlayMsg* msg, const LinkID& bcLink );470 bool handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink ); 471 bool handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink ); 408 472 409 473 // handle link messages 410 474 bool handleLinkRequest( OverlayMsg* msg, LinkDescriptor* ld ); 411 bool handleLinkReply( OverlayMsg* msg, LinkDescriptor* ld );475 bool handleLinkReply( OverlayMsg* msg, reboost::shared_buffer_t sub_message, LinkDescriptor* ld ); 412 476 bool handleLinkUpdate( OverlayMsg* msg, LinkDescriptor* ld ); 413 477 bool handleLinkDirect( OverlayMsg* msg, LinkDescriptor* ld ); 414 478 bool handleLinkAlive( OverlayMsg* msg, LinkDescriptor* ld ); 479 480 // ping-pong over overlaypath/routing 481 bool handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld ); 482 bool handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld ); 415 483 416 484 … … 478 546 // internal message delivery ----------------------------------------------- 479 547 548 // Convert OverlayMessage into new format and give it down to BaseCommunication 549 seqnum_t send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority ); 550 551 480 552 /// routes a message to its destination node 481 void route( OverlayMsg* message );482 553 void route( OverlayMsg* message, const NodeID& last_hop = NodeID::UNSPECIFIED ); 554 483 555 /// sends a raw message to another node, delivers it to the base overlay class 484 seqnum_t send( OverlayMsg* message, const NodeID& destination ); 556 /// may throw "message_not_sent"-exception 557 seqnum_t send( OverlayMsg* message, 558 const NodeID& destination, 559 uint8_t priority, 560 const NodeID& last_hop = NodeID::UNSPECIFIED ) 561 throw(message_not_sent); 485 562 486 563 /// send a raw message using a link descriptor, delivers it to the base overlay class 487 seqnum_t send( OverlayMsg* message, LinkDescriptor* ld, 488 bool ignore_down = false ); 564 seqnum_t send( OverlayMsg* message, 565 LinkDescriptor* ld, 566 uint8_t priority ) throw(message_not_sent); 489 567 490 568 /// send a message using a node id using overlay routing 491 569 /// sets necessary fields in the overlay message! 492 seqnum_t send_node( OverlayMsg* message, const NodeID& remote, 493 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 570 /// may throw "message_not_sent"-exception 571 seqnum_t send_node( OverlayMsg* message, const NodeID& remote, uint8_t priority, 572 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID) throw(message_not_sent); 494 573 495 574 /// send a message using a node id using overlay routing using a link 496 575 /// sets necessary fields in the overlay message! 497 seqnum_t send_link( OverlayMsg* message, const LinkID& link, 498 bool ignore_down = false ); 499 576 void send_link( OverlayMsg* message, 577 const LinkID& link, 578 uint8_t priority ) throw(message_not_sent); 579 580 581 /// sends a notification to a sender from whom we just dropped a message 582 void report_lost_message( const OverlayMsg* message ); 583 500 584 // misc -------------------------------------------------------------------- 501 585 -
source/ariba/overlay/CMakeLists.txt
r10700 r12060 41 41 LinkDescriptor.h 42 42 OverlayBootstrap.h 43 SequenceNumber.h 43 44 ) 44 45 … … 47 48 LinkDescriptor.cpp 48 49 OverlayBootstrap.cpp 50 SequenceNumber.cpp 49 51 ) 50 52 -
source/ariba/overlay/LinkDescriptor.h
r6961 r12060 12 12 #include "ariba/communication/EndpointDescriptor.h" 13 13 #include "ariba/CommunicationListener.h" 14 15 // reboost messages 16 #include "ariba/utility/transport/messages/message.hpp" 17 #include <ariba/utility/misc/sha1.h> 18 19 #include "ariba/overlay/SequenceNumber.h" 20 14 21 15 22 namespace ariba { … … 37 44 class LinkDescriptor { 38 45 public: 46 struct message_queue_entry 47 { 48 reboost::message_t message; 49 uint8_t priority; 50 }; 51 39 52 // ctor 40 53 LinkDescriptor() { 54 time_t now = time(NULL); 55 41 56 // default values 42 57 this->up = false; 58 // this->closing = false; 59 this->failed = false; 43 60 this->fromRemote = false; 44 61 this->remoteNode = NodeID::UNSPECIFIED; … … 46 63 this->communicationUp = false; 47 64 this->communicationId = LinkID::UNSPECIFIED; 48 this->keepAlive Time = time(NULL);49 this->keepAlive Missed = 0;65 this->keepAliveReceived = now; 66 this->keepAliveSent = now; 50 67 this->relaying = false; 51 this->timeRelaying = time(NULL);68 this->timeRelaying = now; 52 69 this->dropAfterRelaying = false; 53 70 this->service = ServiceID::UNSPECIFIED; … … 56 73 this->remoteLink = LinkID::UNSPECIFIED; 57 74 this->autolink = false; 58 this->lastuse = time(NULL);75 this->lastuse = now; 59 76 this->retryCounter = 0; 77 this->hops = -1; 78 79 this->transmit_seqnums = false; // XXX 60 80 } 61 81 … … 67 87 // general information about the link -------------------------------------- 68 88 bool up; ///< flag whether this link is up and running 89 // bool closing; ///< flag, whether this link is in the regular process of closing 90 bool failed; ///< flag, whether communication is (assumed to be) not/no longer possible on this link 69 91 bool fromRemote; ///< flag, whether this link was requested from remote 70 92 NodeID remoteNode; ///< remote end-point node 71 bool isVital() {72 return up && keepAliveMissed == 0;73 }74 bool isDirectVital() {75 return isVital() && communicationUp && !relayed;76 }77 93 78 94 … … 82 98 bool communicationUp; ///< flag, whether the communication is up 83 99 100 // sequence numbers -------------------------------------------------------- 101 SequenceNumber last_sent_seqnum; 102 bool transmit_seqnums; 103 84 104 // direct link retries ----------------------------------------------------- 85 105 EndpointDescriptor endpoint; … … 87 107 88 108 // link alive information -------------------------------------------------- 89 time_t keepAlive Time; ///< the last time a keep-alive message was received90 int keepAliveMissed; ///< the number of missed keep-alive messages109 time_t keepAliveReceived; ///< the last time a keep-alive message was received 110 time_t keepAliveSent; ///< the number of missed keep-alive messages 91 111 void setAlive() { 92 keepAliveMissed = 0;93 keepAlive Time= time(NULL);112 // keepAliveSent = time(NULL); 113 keepAliveReceived = time(NULL); 94 114 } 95 115 … … 98 118 LinkID remoteLink; ///< the remote link id 99 119 vector<NodeID> routeRecord; 120 int hops; 100 121 101 122 // relay state ------------------------------------------------------------- … … 114 135 // auto links -------------------------------------------------------------- 115 136 bool autolink; ///< flag, whether this link is a auto-link 116 time_t lastuse; ///< time, when the link was last used 117 deque< Message*> messageQueue; ///< waiting messages to be delivered137 time_t lastuse; ///< time, when the link was last used XXX AUTO_LINK-ONLY 138 deque<message_queue_entry> messageQueue; ///< waiting messages to be delivered 118 139 void setAutoUsed() { 119 140 if (autolink) lastuse = time(NULL); … … 121 142 /// drops waiting auto-link messages 122 143 void flushQueue() { 123 BOOST_FOREACH( Message* msg, messageQueue ) delete msg; 144 // BOOST_FOREACH( Message* msg, messageQueue ) delete msg; // XXX MARIO: shouldn't be necessary anymore, since we're using shared pointers 124 145 messageQueue.clear(); 125 146 } … … 127 148 // string representation --------------------------------------------------- 128 149 std::string to_string() const { 150 time_t now = time(NULL); 151 129 152 std::ostringstream s; 153 if ( relayed ) 154 s << "[RELAYED-"; 155 else 156 s << "[DIRECT-"; 157 s << "LINK] "; 158 s << "id=" << overlayId.toString().substr(0,4) << " "; 159 s << "serv=" << service.toString() << " "; 130 160 s << "up=" << up << " "; 131 161 s << "init=" << !fromRemote << " "; 132 s << "id=" << overlayId.toString().substr(0,4) << " ";133 s << "serv=" << service.toString() << " ";134 162 s << "node=" << remoteNode.toString().substr(0,4) << " "; 135 163 s << "relaying=" << relaying << " "; 136 s << " miss=" << keepAliveMissed << "";164 s << "last_received=" << now - keepAliveReceived << "s "; 137 165 s << "auto=" << autolink << " "; 166 s << "hops=" << hops << " "; 138 167 if ( relayed ) { 139 168 s << "| Relayed: "; … … 146 175 } else { 147 176 s << "| Direct: "; 148 s << "using id=" << communicationId.toString().substr(0,4) << " ";177 s << "using [COMMUNICATION-LINK] id=" << communicationId.toString().substr(0,4) << " "; 149 178 s << "(up=" << communicationUp << ") "; 150 179 } -
source/ariba/overlay/messages/JoinReply.cpp
r3690 r12060 44 44 vsznDefault(JoinReply); 45 45 46 JoinReply::JoinReply(const SpoVNetID _spovnetid, const OverlayParameterSet _param, bool _joinAllowed, const EndpointDescriptor _bootstrapEp) 47 : spovnetid( _spovnetid ), param( _param ), joinAllowed( _joinAllowed ), bootstrapEp( _bootstrapEp ){ 46 JoinReply::JoinReply(const SpoVNetID _spovnetid, const OverlayParameterSet _param, bool _joinAllowed) 47 : spovnetid( _spovnetid ), param( _param ), joinAllowed( _joinAllowed ) 48 { 48 49 } 49 50 … … 64 65 } 65 66 66 const EndpointDescriptor& JoinReply::getBootstrapEndpoint(){67 return bootstrapEp;68 }67 //const EndpointDescriptor& JoinReply::getBootstrapEndpoint(){ 68 // return bootstrapEp; 69 //} 69 70 70 71 }} // ariba::overlay -
source/ariba/overlay/messages/JoinReply.h
r5870 r12060 40 40 #define JOIN_REPLY_H__ 41 41 42 #include "ariba/utility/messages.h" 42 //#include "ariba/utility/messages.h" 43 #include "ariba/utility/messages/Message.h" 43 44 #include "ariba/utility/serialization.h" 44 45 #include "ariba/utility/types/SpoVNetID.h" 45 46 #include "ariba/utility/types/NodeID.h" 46 47 #include "ariba/utility/types/OverlayParameterSet.h" 47 #include "ariba/communication/EndpointDescriptor.h"48 //#include "ariba/communication/EndpointDescriptor.h" 48 49 49 50 using ariba::utility::OverlayParameterSet; … … 51 52 using ariba::utility::SpoVNetID; 52 53 using ariba::utility::NodeID; 53 using ariba::communication::EndpointDescriptor;54 //using ariba::communication::EndpointDescriptor; 54 55 55 56 namespace ariba { … … 64 65 OverlayParameterSet param; //< overlay parameters 65 66 bool joinAllowed; //< join successfull or access denied 66 EndpointDescriptor bootstrapEp; //< the endpoint for bootstrapping the overlay interface67 // EndpointDescriptor bootstrapEp; //< the endpoint for bootstrapping the overlay interface 67 68 68 69 public: … … 70 71 const SpoVNetID _spovnetid = SpoVNetID::UNSPECIFIED, 71 72 const OverlayParameterSet _param = OverlayParameterSet::DEFAULT, 72 bool _joinAllowed = false ,73 const EndpointDescriptor _bootstrapEp = EndpointDescriptor::UNSPECIFIED() 73 bool _joinAllowed = false /*, 74 const EndpointDescriptor _bootstrapEp = EndpointDescriptor::UNSPECIFIED()*/ 74 75 ); 75 76 … … 79 80 const OverlayParameterSet& getParam(); 80 81 bool getJoinAllowed(); 81 const EndpointDescriptor& getBootstrapEndpoint();82 // const EndpointDescriptor& getBootstrapEndpoint(); 82 83 }; 83 84 … … 86 87 sznBeginDefault( ariba::overlay::JoinReply, X ) { 87 88 uint8_t ja = joinAllowed; 88 X && &spovnetid && param && bootstrapEp && ja; 89 X && &spovnetid && param; 90 // X && bootstrapEp; 91 X && ja; 89 92 if (X.isDeserializer()) joinAllowed = ja; 90 93 } sznEnd(); -
source/ariba/overlay/messages/OverlayMsg.h
r10653 r12060 47 47 #include "ariba/utility/types/NodeID.h" 48 48 #include "ariba/utility/types/LinkID.h" 49 #include "ariba/communication/EndpointDescriptor.h" 50 49 // #include <ariba/utility/misc/sha1.h> 50 #include "ariba/overlay/SequenceNumber.h" 51 51 52 52 namespace ariba { … … 57 57 using ariba::utility::ServiceID; 58 58 using ariba::utility::Message; 59 using ariba::communication::EndpointDescriptor;59 //using ariba::communication::EndpointDescriptor; 60 60 using_serialization; 61 61 … … 64 64 * between nodes. 65 65 * 66 * @author Sebastian Mies <mies@tm.uka.de> 66 * @author Sebastian Mies <mies@tm.uka.de>, Mario Hock 67 67 */ 68 68 class OverlayMsg: public Message { VSERIALIZEABLE; … … 75 75 maskTransfer = 0x10, ///< bit mask for transfer messages 76 76 typeData = 0x11, ///< message contains data for higher layers 77 typeMessageLost = 0x12, ///< message contains info about a dropped message 77 78 78 79 // join signaling … … 87 88 typeLinkUpdate = 0x33, ///< update message for link association 88 89 typeLinkDirect = 0x34, ///< direct connection has been established 89 typeLinkAlive = 0x35, ///< keep-alive message 90 typeKeepAlive = 0x35, ///< keep-alive message 91 typeKeepAliveReply = 0x36, ///< keep-alive message (replay) 92 typeLinkClose = 0x37, 90 93 91 94 /// DHT routed messages … … 100 103 maskDHTResponse = 0x50, ///< bit mask for dht responses 101 104 typeDHTData = 0x51, ///< DHT get data 105 106 /// misc message types 107 typePing = 0x44, 108 typePong = 0x45, 102 109 103 110 // topology signaling … … 105 112 typeSignalingEnd = 0xFF ///< end of the signaling types 106 113 }; 114 115 /// message flags (uint8_t) 116 enum flags_ 117 { 118 flagRelayed = 1 << 0, 119 flagRegisterRelay = 1 << 1, 120 flagRouteRecord = 1 << 2, 121 flagSeqNum1 = 1 << 3, 122 flagSeqNum2 = 1 << 4, 123 flagAutoLink = 1 << 5, 124 flagLinkMessage = 1 << 6, 125 flagHasMoreFlags = 1 << 7 126 }; 107 127 108 128 /// default constructor … … 114 134 const LinkID& _sourceLink = LinkID::UNSPECIFIED, 115 135 const LinkID& _destinationLink = LinkID::UNSPECIFIED ) 116 : type(type), flags(0), hops(0), ttl(10),136 : type(type), flags(0), extended_flags(0), hops(0), ttl(10), priority(0), 117 137 service(_service), 118 138 sourceNode(_sourceNode), destinationNode(_destinationNode), … … 125 145 // copy constructor 126 146 OverlayMsg(const OverlayMsg& rhs) 127 : type(rhs.type), flags(rhs.flags), hops(rhs.hops), ttl(rhs.ttl), 128 service(rhs.service), 147 : type(rhs.type), flags(rhs.flags), extended_flags(rhs.extended_flags), 148 hops(rhs.hops), ttl(rhs.ttl), 149 priority(rhs.priority), service(rhs.service), 129 150 sourceNode(rhs.sourceNode), destinationNode(rhs.destinationNode), 130 151 sourceLink(rhs.sourceLink), destinationLink(rhs.destinationLink), … … 149 170 } 150 171 172 /// priority ------------------------------------------------------------------ 173 174 uint8_t getPriority() const { 175 return priority; 176 } 177 178 void setPriority(uint8_t priority) { 179 this->priority = priority; 180 } 181 151 182 /// flags ------------------------------------------------------------------ 152 183 153 184 bool isRelayed() const { 154 return (flags & 0x01)!=0;185 return (flags & flagRelayed)!=0; 155 186 } 156 187 157 188 void setRelayed( bool relayed = true ) { 158 if (relayed) flags |= 1; else flags &= ~1;189 if (relayed) flags |= flagRelayed; else flags &= ~flagRelayed; 159 190 } 160 191 161 192 bool isRegisterRelay() const { 162 return (flags & 0x02)!=0;193 return (flags & flagRegisterRelay)!=0; 163 194 } 164 195 165 196 void setRegisterRelay( bool relayed = true ) { 166 if (relayed) flags |= 0x02; else flags &= ~0x02;197 if (relayed) flags |= flagRegisterRelay; else flags &= ~flagRegisterRelay; 167 198 } 168 199 169 200 bool isRouteRecord() const { 170 return (flags & 0x04)!=0;201 return (flags & flagRouteRecord)!=0; 171 202 } 172 203 173 204 void setRouteRecord( bool route_record = true ) { 174 if (route_record) flags |= 0x04; else flags &= ~0x04;205 if (route_record) flags |= flagRouteRecord; else flags &= ~flagRouteRecord; 175 206 } 176 207 177 208 bool isAutoLink() const { 178 return (flags & 0x80) == 0x80;209 return (flags & flagAutoLink) == flagAutoLink; 179 210 } 180 211 181 212 void setAutoLink(bool auto_link = true ) { 182 if (auto_link) flags |= 0x80; else flags &= ~0x80;213 if (auto_link) flags |= flagAutoLink; else flags &= ~flagAutoLink; 183 214 } 184 215 185 216 bool isLinkMessage() const { 186 return (flags & 0x40)!=0;217 return (flags & flagLinkMessage)!=0; 187 218 } 188 219 189 220 void setLinkMessage(bool link_info = true ) { 190 if (link_info) flags |= 0x40; else flags &= ~0x40; 191 } 192 193 bool containsSourceEndpoint() const { 194 return (flags & 0x20)!=0; 195 } 196 197 void setContainsSourceEndpoint(bool contains_endpoint) { 198 if (contains_endpoint) flags |= 0x20; else flags &= ~0x20; 199 } 221 if (link_info) flags |= flagLinkMessage; else flags &= ~flagLinkMessage; 222 } 223 224 bool hasExtendedFlags() const { 225 return (flags & flagHasMoreFlags) == flagHasMoreFlags; 226 } 200 227 201 228 /// number of hops and time to live ---------------------------------------- … … 264 291 this->destinationLink = link; 265 292 setLinkMessage(); 266 }267 268 void setSourceEndpoint( const EndpointDescriptor& endpoint ) {269 sourceEndpoint = endpoint;270 setContainsSourceEndpoint(true);271 }272 273 const EndpointDescriptor& getSourceEndpoint() const {274 return sourceEndpoint;275 293 } 276 294 … … 284 302 destinationLink = dummyLink; 285 303 hops = 0; 304 routeRecord.clear(); 286 305 } 287 306 … … 294 313 routeRecord.push_back(node); 295 314 } 315 316 /// sequence numbers 317 bool hasShortSeqNum() const 318 { 319 return (flags & (flagSeqNum1 | flagSeqNum2)) == flagSeqNum1; 320 } 321 322 bool hasLongSeqNum() const 323 { 324 return (flags & (flagSeqNum1 | flagSeqNum2)) == flagSeqNum2; 325 } 326 327 void setSeqNum(const SequenceNumber& sequence_number) 328 { 329 this->seqnum = sequence_number; 330 331 // short seqnum 332 if ( sequence_number.isShortSeqNum() ) 333 { 334 flags |= flagSeqNum1; 335 flags &= ~flagSeqNum2; 336 } 337 // longseqnum 338 else if ( sequence_number.isShortSeqNum() ) 339 { 340 flags &= ~flagSeqNum1; 341 flags |= flagSeqNum2; 342 } 343 // no seqnum 344 else 345 { 346 flags &= ~flagSeqNum1; 347 flags &= ~flagSeqNum2; 348 } 349 } 350 351 const SequenceNumber& getSeqNum() const 352 { 353 return seqnum; 354 } 355 296 356 297 357 private: 298 uint8_t type, flags, hops, ttl;358 uint8_t type, flags, extended_flags, hops, ttl, priority; 299 359 ServiceID service; 300 360 NodeID sourceNode; … … 302 362 LinkID sourceLink; 303 363 LinkID destinationLink; 304 EndpointDescriptor sourceEndpoint;364 // EndpointDescriptor sourceEndpoint; 305 365 vector<NodeID> routeRecord; 366 SequenceNumber seqnum; 306 367 }; 307 368 … … 311 372 sznBeginDefault( ariba::overlay::OverlayMsg, X ){ 312 373 // header 313 X && type && flags && hops && ttl; 374 X && type && flags; 375 376 if ( hasExtendedFlags() ) 377 X && extended_flags; 378 379 X && hops && ttl; 314 380 315 381 // addresses 316 382 X && &service && &sourceNode && &destinationNode; 383 384 // priority 385 X && priority; 317 386 318 387 // message is associated with a end-to-end link … … 320 389 X && &sourceLink && &destinationLink; 321 390 322 // message is associated with a source end-point 323 if (containsSourceEndpoint()) 324 X && sourceEndpoint; 325 391 392 /* seqnum */ 393 // serialize 394 if ( X.isSerializer() ) 395 { 396 if ( hasShortSeqNum() ) 397 { 398 uint32_t short_seqnum; 399 short_seqnum = seqnum.getShortSeqNum(); 400 X && short_seqnum; 401 } 402 if ( hasLongSeqNum() ) 403 { 404 uint64_t long_seqnum; 405 long_seqnum = seqnum.getLongSeqNum(); 406 X && long_seqnum; 407 } 408 } 409 // deserialize 410 else 411 { 412 if ( hasShortSeqNum() ) 413 { 414 uint32_t short_seqnum; 415 X && short_seqnum; 416 seqnum = ariba::overlay::SequenceNumber(short_seqnum); 417 } 418 if ( hasLongSeqNum() ) 419 { 420 uint64_t long_seqnum; 421 X && long_seqnum; 422 seqnum = ariba::overlay::SequenceNumber(long_seqnum); 423 } 424 } 425 426 326 427 // message should record its route 327 428 if (isRouteRecord()) { … … 333 434 334 435 // payload 335 X && Payload();436 // X && Payload(); 336 437 } sznEnd(); 337 438 -
source/ariba/overlay/modules/OverlayFactory.cpp
r3718 r12060 41 41 // structured overlays 42 42 #include "chord/Chord.h" 43 #include "onehop/OneHop.h" 43 //#include "onehop/OneHop.h" //DEPRECATED 44 44 45 45 namespace ariba { … … 57 57 return new Chord( baseoverlay, nodeid, routeReceiver, param ); 58 58 59 case OverlayParameterSet::OverlayStructureOneHop:60 return new OneHop( baseoverlay, nodeid, routeReceiver, param );59 // case OverlayParameterSet::OverlayStructureOneHop: 60 // return new OneHop( baseoverlay, nodeid, routeReceiver, param ); 61 61 62 62 default: 63 // NEVER return "NULL" 64 assert(false); 65 throw 42; 63 66 return NULL; 64 67 } 65 68 69 // NEVER return "NULL" 70 assert(false); 71 throw 42; 66 72 return NULL; 67 73 } -
source/ariba/overlay/modules/OverlayInterface.cpp
r6854 r12060 68 68 69 69 void OverlayInterface::onLinkFail(const LinkID& lnk, const NodeID& remote) { 70 onLinkDown(lnk, remote); 70 71 } 71 72 … … 79 80 } 80 81 81 void OverlayInterface::onMessage(const DataMessage& msg, const NodeID& remote, 82 void OverlayInterface::onMessage(OverlayMsg* msg, 83 reboost::shared_buffer_t sub_msg, 84 const NodeID& remote, 82 85 const LinkID& lnk) { 83 86 } -
source/ariba/overlay/modules/OverlayInterface.h
r10573 r12060 145 145 */ 146 146 virtual const LinkID& getNextLinkId( const NodeID& id ) const = 0; 147 147 148 /** 149 * Returns link ids of possible next hops a route message could take, 150 * sorted by "quality" (e.g. overlay-distance). 151 * 152 * The »num« parameter can be used to specify the desired number of elements 153 * in the returned vector. This is intendet for optimizations. The 154 * implementation may choose to return a different number of elements than 155 * requested. 156 * 157 * NOTE: The returned vector may contain »unspecified« links. These refer to 158 * to the own node. (e.g. If there's no closer node, the top element in the 159 * returned vector is unsoecified.) 160 * 161 * @param id The destination node id 162 * @param num The desired number of elements in the returned vector. 163 * (0 means »not specified/max)« 164 * @return A sorted vector of link ids to possible next hops. 165 */ 166 virtual std::vector<const LinkID*> getSortedLinkIdsTowardsNode( 167 const NodeID& id, int num = 0 ) const = 0; 168 148 169 /** 149 170 * Returns the NodeID of the next hop a route message would take. … … 176 197 177 198 /// @see CommunicationListener 178 virtual void onMessage(const DataMessage& msg, const NodeID& remote, 199 virtual void onMessage(OverlayMsg* msg, 200 reboost::shared_buffer_t sub_msg, 201 const NodeID& remote, 179 202 const LinkID& lnk = LinkID::UNSPECIFIED); 180 203 -
source/ariba/overlay/modules/OverlayStructureEvents.h
r5151 r12060 41 41 42 42 #include "ariba/utility/types/NodeID.h" 43 #include "ariba/utility/messages.h" 43 #include "ariba/utility/types/LinkID.h" 44 #include "ariba/utility/messages/Message.h" 44 45 45 46 using ariba::utility::NodeID; 47 using ariba::utility::LinkID; 46 48 using ariba::utility::Message; 47 49 … … 50 52 51 53 class OverlayInterface; 52 class OneHop;54 //class OneHop; 53 55 54 56 class OverlayStructureEvents { 55 57 friend class ariba::overlay::OverlayInterface; 56 friend class ariba::overlay::OneHop;58 // friend class ariba::overlay::OneHop; 57 59 58 60 public: -
source/ariba/overlay/modules/chord/Chord.cpp
r10572 r12060 43 43 #include "detail/chord_routing_table.hpp" 44 44 45 #include "messages/Discovery.h" 45 //#include "messages/Discovery.h" // XXX DEPRECATED 46 46 47 47 namespace ariba { … … 55 55 typedef chord_routing_table::item route_item; 56 56 57 using ariba::transport::system_priority; 58 57 59 use_logging_cpp( Chord ); 60 61 62 ////// Messages 63 struct DiscoveryMessage 64 { 65 /** 66 * DiscoveryMessage 67 * - type 68 * - data 69 * - Endpoint 70 */ 71 72 // type enum 73 enum type_ { 74 invalid = 0, 75 normal = 1, 76 successor = 2, 77 predecessor = 3 78 }; 79 80 81 // data 82 uint8_t type; 83 uint8_t ttl; 84 EndpointDescriptor endpoint; 85 86 // serialize 87 reboost::message_t serialize() 88 { 89 // serialize endpoint 90 reboost::message_t msg = endpoint.serialize(); 91 92 // serialize type and ttl 93 uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data(); 94 buff1[0] = type; 95 buff1[1] = ttl; 96 97 return msg; 98 } 99 100 //deserialize 101 reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff) 102 { 103 // deserialize type and ttl 104 const uint8_t* bytes = buff.data(); 105 type = bytes[0]; 106 ttl = bytes[1]; 107 108 // deserialize endpoint 109 return endpoint.deserialize(buff(2*sizeof(uint8_t))); 110 } 111 }; 112 58 113 59 114 Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, … … 102 157 103 158 /// helper: sends a message using the "base overlay" 104 seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) { 105 if (link.isUnspecified()) return 0; 106 return baseoverlay.send_link( msg, link ); 159 void Chord::send( OverlayMsg* msg, const LinkID& link ) { 160 if (link.isUnspecified()) 161 return; 162 163 baseoverlay.send_link( msg, link, system_priority::OVERLAY ); 164 } 165 166 void Chord::send_node( OverlayMsg* message, const NodeID& remote ) 167 { 168 try 169 { 170 baseoverlay.send( message, remote, system_priority::OVERLAY ); 171 } 172 catch ( message_not_sent& e ) 173 { 174 logging_warn("Chord: Could not send message to " << remote 175 << ": " << e.what()); 176 } 107 177 } 108 178 … … 116 186 OverlayMsg msg( typeDiscovery ); 117 187 msg.setRegisterRelay(true); 118 Discovery dmsg( Discovery::normal, (uint8_t)ttl, baseoverlay.getEndpointDescriptor() ); 119 msg.encapsulate(&dmsg); 188 189 // create DiscoveryMessage 190 DiscoveryMessage dmsg; 191 dmsg.type = DiscoveryMessage::normal; 192 dmsg.ttl = ttl; 193 dmsg.endpoint = baseoverlay.getEndpointDescriptor(); 194 195 msg.set_payload_message(dmsg.serialize()); 120 196 121 197 // send to node 122 baseoverlay.send_node( &msg, remote ); 198 try 199 { 200 baseoverlay.send_node( &msg, remote, system_priority::OVERLAY ); 201 } 202 catch ( message_not_sent& e ) 203 { 204 logging_warn("Chord: Could not send message to " << remote 205 << ": " << e.what()); 206 } 123 207 } 124 208 125 209 void Chord::discover_neighbors( const LinkID& link ) { 126 210 uint8_t ttl = 1; 211 212 // FIXME try-catch for the send operations 213 214 // create DiscoveryMessage 215 DiscoveryMessage dmsg; 216 dmsg.ttl = ttl; 217 dmsg.endpoint = baseoverlay.getEndpointDescriptor(); 127 218 { 128 219 // send predecessor discovery 129 220 OverlayMsg msg( typeDiscovery ); 130 221 msg.setRegisterRelay(true); 131 Discovery dmsg( Discovery::predecessor, ttl, 132 baseoverlay.getEndpointDescriptor() ); 133 msg.encapsulate(&dmsg); 222 223 // set type 224 dmsg.type = DiscoveryMessage::predecessor; 225 226 // send 227 msg.set_payload_message(dmsg.serialize()); 134 228 send(&msg, link); 135 229 } … … 137 231 // send successor discovery 138 232 OverlayMsg msg( typeDiscovery ); 139 msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); 233 // msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); // XXX this was redundand, wasn't it? 140 234 msg.setRegisterRelay(true); 141 Discovery dmsg( Discovery::successor, ttl, 142 baseoverlay.getEndpointDescriptor() ); 143 msg.encapsulate(&dmsg); 235 236 // set type 237 dmsg.type = DiscoveryMessage::successor; 238 239 // send 240 msg.set_payload_message(dmsg.serialize()); 144 241 send(&msg, link); 145 242 } … … 163 260 164 261 // timer for stabilization management 165 Timer::setInterval(1000); 262 // Timer::setInterval(1000); // TODO find an appropriate interval! 263 Timer::setInterval(10000); // XXX testing... 166 264 Timer::start(); 167 265 } … … 200 298 return item->info; 201 299 } 300 301 std::vector<const LinkID*> Chord::getSortedLinkIdsTowardsNode( 302 const NodeID& id, int num ) const 303 { 304 std::vector<const LinkID*> ret; 305 306 switch ( num ) 307 { 308 // special case: just call »getNextLinkId« 309 case 1: 310 { 311 ret.push_back(&getNextLinkId(id)); 312 313 break; 314 } 315 316 // * calculate top 2 * 317 case 0: 318 case 2: 319 { 320 std::vector<const route_item*> items = table->get_next_2_hops(id); 321 322 ret.reserve(items.size()); 323 324 BOOST_FOREACH( const route_item* item, items ) 325 { 326 ret.push_back(&item->info); 327 } 328 329 break; 330 } 331 332 // NOTE: implement real sorting, if needed (and handle "case 0" properly, then) 333 default: 334 { 335 throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)"); 336 337 break; 338 } 339 } 340 341 return ret; 342 } 343 202 344 203 345 /// @see OverlayInterface.h … … 253 395 if (remote==nodeid) { 254 396 logging_warn("dropping link that has been established to myself (nodes have same nodeid?)"); 397 logging_warn("NodeID: " << remote); 255 398 baseoverlay.dropLink(lnk); 256 399 return; … … 290 433 /// @see CommunicationListener.h or @see OverlayInterface.h 291 434 void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { 292 logging_debug("link_down: link=" << lnk.toString() << " remote=" << 435 // XXX logging_debug 436 logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" << 293 437 remote.toString() ); 294 438 … … 303 447 /// @see CommunicationListener.h 304 448 /// @see OverlayInterface.h 305 void Chord::onMessage(const DataMessage& msg, const NodeID& remote, 449 void Chord::onMessage(OverlayMsg* msg, 450 reboost::shared_buffer_t sub_msg, 451 const NodeID& remote, 306 452 const LinkID& link) { 307 453 308 // decode message309 OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage());310 if (m == NULL) return;311 312 454 // handle messages 313 switch ((signalMessageTypes) m->getType()) {455 switch ((signalMessageTypes) msg->getType()) { 314 456 315 457 // discovery request 316 case typeDiscovery: { 317 // decapsulate message 318 Discovery* dmsg = m->decapsulate<Discovery> (); 458 case typeDiscovery: 459 { 460 // deserialize discovery message 461 DiscoveryMessage dmsg; 462 dmsg.deserialize(sub_msg); 463 319 464 logging_debug("Received discovery message with" 320 << " src=" << m ->getSourceNode().toString()321 << " dst=" << m ->getDestinationNode().toString()322 << " ttl=" << (int)dmsg ->getTTL()323 << " type=" << (int)dmsg ->getType()465 << " src=" << msg->getSourceNode().toString() 466 << " dst=" << msg->getDestinationNode().toString() 467 << " ttl=" << (int)dmsg.ttl 468 << " type=" << (int)dmsg.type 324 469 ); 325 470 … … 327 472 bool found = false; 328 473 BOOST_FOREACH( NodeID& value, discovery ) 329 if (value == m ->getSourceNode()) {474 if (value == msg->getSourceNode()) { 330 475 found = true; 331 476 break; 332 477 } 333 if (!found) discovery.push_back(m ->getSourceNode());478 if (!found) discovery.push_back(msg->getSourceNode()); 334 479 335 480 // check if source node can be added to routing table and setup link 336 if (m ->getSourceNode() != nodeid)337 setup( dmsg ->getEndpoint(), m->getSourceNode() );481 if (msg->getSourceNode() != nodeid) 482 setup( dmsg.endpoint, msg->getSourceNode() ); 338 483 339 484 // process discovery message -------------------------- switch start -- 340 switch (dmsg->getType()) { 341 342 // normal: route discovery message like every other message 343 case Discovery::normal: { 344 // closest node? yes-> split to follow successor and predecessor 345 if ( table->is_closest_to(m->getDestinationNode()) ) { 346 logging_debug("Discovery split:"); 347 if (!table->get_successor()->isUnspecified()) { 348 OverlayMsg omsg(*m); 349 dmsg->setType(Discovery::successor); 350 omsg.encapsulate(dmsg); 351 logging_debug("* Routing to successor " 352 << table->get_successor()->toString() ); 353 baseoverlay.send( &omsg, *table->get_successor() ); 354 } 355 356 // send predecessor message 357 if (!table->get_predesessor()->isUnspecified()) { 358 OverlayMsg omsg(*m); 359 dmsg->setType(Discovery::predecessor); 360 omsg.encapsulate(dmsg); 361 logging_debug("* Routing to predecessor " 362 << table->get_predesessor()->toString() ); 363 baseoverlay.send( &omsg, *table->get_predesessor() ); 364 } 365 } 366 // no-> route message 367 else { 368 baseoverlay.route( m ); 369 } 370 break; 371 } 372 373 // successor mode: follow the successor until TTL is zero 374 case Discovery::successor: 375 case Discovery::predecessor: { 376 // reached destination? no->forward! 377 if (m->getDestinationNode() != nodeid) { 378 OverlayMsg omsg(*m); 379 omsg.encapsulate(dmsg); 380 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 381 baseoverlay.route( &omsg ); 382 break; 383 } 384 385 // time to live ended? yes-> stop routing 386 if (dmsg->getTTL() == 0 || dmsg->getTTL() > 10) break; 387 388 // decrease time-to-live 389 dmsg->setTTL(dmsg->getTTL() - 1); 390 391 const route_item* item = NULL; 392 if (dmsg->getType() == Discovery::successor && 393 table->get_successor() != NULL) { 394 item = table->get(*table->get_successor()); 395 } else { 396 if (table->get_predesessor()!=NULL) 397 item = table->get(*table->get_predesessor()); 398 } 399 if (item == NULL) 400 break; 401 402 logging_debug("Routing discovery message to succ/pred " 403 << item->id.toString() ); 404 OverlayMsg omsg(*m); 405 omsg.encapsulate(dmsg); 406 omsg.setDestinationNode(item->id); 407 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 408 baseoverlay.send(&omsg, omsg.getDestinationNode()); 409 break; 410 } 411 case Discovery::invalid: 412 break; 413 414 default: 415 break; 416 } 417 // process discovery message ---------------------------- switch end -- 418 419 delete dmsg; 420 break; 421 } 422 423 // leave 424 case typeLeave: { 425 if (link!=LinkID::UNSPECIFIED) { 426 route_item* item = table->get(remote); 427 if (item!=NULL) item->info = LinkID::UNSPECIFIED; 428 table->remove(remote); 429 baseoverlay.dropLink(link); 430 } 431 break; 432 }} 485 switch ( dmsg.type ) 486 { 487 // normal: route discovery message like every other message 488 case DiscoveryMessage::normal: 489 { 490 // closest node? yes-> split to follow successor and predecessor 491 if ( table->is_closest_to(msg->getDestinationNode()) ) 492 { 493 logging_debug("Discovery split:"); 494 if (!table->get_successor()->isUnspecified()) 495 { 496 OverlayMsg omsg(*msg); 497 498 dmsg.type = DiscoveryMessage::successor; 499 omsg.set_payload_message(dmsg.serialize()); 500 501 logging_debug("* Routing to successor " 502 << table->get_successor()->toString() ); 503 send_node( &omsg, *table->get_successor() ); 504 } 505 506 // send predecessor message 507 if (!table->get_predesessor()->isUnspecified()) 508 { 509 OverlayMsg omsg(*msg); 510 511 dmsg.type = DiscoveryMessage::predecessor; 512 omsg.set_payload_message(dmsg.serialize()); 513 514 logging_debug("* Routing to predecessor " 515 << table->get_predesessor()->toString() ); 516 send_node( &omsg, *table->get_predesessor() ); 517 } 518 } 519 // no-> route message 520 else 521 { 522 baseoverlay.route( msg ); 523 } 524 break; 525 } 526 527 // successor mode: follow the successor until TTL is zero 528 case DiscoveryMessage::successor: 529 case DiscoveryMessage::predecessor: 530 { 531 // reached destination? no->forward! 532 if (msg->getDestinationNode() != nodeid) 533 { 534 OverlayMsg omsg(*msg); 535 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 536 537 omsg.set_payload_message(dmsg.serialize()); 538 539 baseoverlay.route( &omsg ); 540 break; 541 } 542 543 // time to live ended? yes-> stop routing 544 if (dmsg.ttl == 0 || dmsg.ttl > 10) break; 545 546 // decrease time-to-live 547 dmsg.ttl--; 548 549 const route_item* item = NULL; 550 if (dmsg.type == DiscoveryMessage::successor && 551 table->get_successor() != NULL) 552 { 553 item = table->get(*table->get_successor()); 554 } 555 else if (table->get_predesessor() != NULL) 556 { 557 item = table->get(*table->get_predesessor()); 558 } 559 if (item == NULL) 560 break; 561 562 logging_debug("Routing discovery message to succ/pred " 563 << item->id.toString() ); 564 OverlayMsg omsg(*msg); 565 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 566 omsg.setDestinationNode(item->id); 567 568 omsg.set_payload_message(dmsg.serialize()); 569 570 send_node( &omsg, omsg.getDestinationNode() ); 571 break; 572 } 573 case DiscoveryMessage::invalid: 574 break; 575 576 default: 577 break; 578 } 579 // process discovery message ---------------------------- switch end -- 580 581 break; 582 } 583 584 // leave 585 case typeLeave: { 586 if (link!=LinkID::UNSPECIFIED) { 587 route_item* item = table->get(remote); 588 if (item!=NULL) item->info = LinkID::UNSPECIFIED; 589 table->remove(remote); 590 baseoverlay.dropLink(link); 591 } 592 break; 593 } 594 } 433 595 } 434 596 -
source/ariba/overlay/modules/chord/Chord.h
r10572 r12060 45 45 #include "../OverlayInterface.h" 46 46 #include <vector> 47 #include <stdexcept> 47 48 48 49 class chord_routing_table; … … 87 88 const NodeID& node = NodeID::UNSPECIFIED ); 88 89 89 // helper: sends a message using the "base overlay"90 seqnum_tsend( OverlayMsg* msg, const LinkID& link );90 // helper: sends a message over a link using the "base overlay" 91 void send( OverlayMsg* msg, const LinkID& link ); 91 92 93 // helper: sends a message to a node using the "base overlay" 94 void send_node( OverlayMsg* message, const NodeID& remote ); 95 92 96 // stabilization: sends a discovery message to the specified neighborhood 93 97 void send_discovery_to( const NodeID& destination, int ttl = 3 ); … … 105 109 virtual const LinkID& getNextLinkId( const NodeID& id ) const; 106 110 111 /// @see OverlayInterface.h 112 /// NOTE: This implementation excepts num == 2 113 virtual std::vector<const LinkID*> getSortedLinkIdsTowardsNode( 114 const NodeID& id, int num = 0 ) const; 115 107 116 /// @see OverlayInterface.h 108 117 virtual const NodeID& getNextNodeId( const NodeID& id ) const; … … 138 147 139 148 /// @see CommunicationListener.h or @see OverlayInterface.h 140 virtual void onMessage(const DataMessage& msg, const NodeID& remote, 149 virtual void onMessage(OverlayMsg* msg, 150 reboost::shared_buffer_t sub_msg, 151 const NodeID& remote, 141 152 const LinkID& lnk = LinkID::UNSPECIFIED); 142 153 -
source/ariba/overlay/modules/chord/detail/chord_routing_table.hpp
r8606 r12060 96 96 // the own node id 97 97 nodeid_t id; 98 99 // the own node id as routing item 100 item own_id_item; 98 101 99 102 // successor and predecessor tables … … 168 171 /// constructs the reactive chord routing table 169 172 explicit chord_routing_table( const nodeid_t& id, int redundancy = 4 ) : 170 id(id), succ( redundancy, succ_compare_type(this->id), *this ), 171 pred( redundancy, pred_compare_type(this->id), *this ) { 173 id(id), 174 succ( redundancy, succ_compare_type(this->id), *this ), 175 pred( redundancy, pred_compare_type(this->id), *this ) 176 { 177 // init reflexive item 178 own_id_item.id = id; 179 own_id_item.ref_count = 1; 172 180 173 181 // create finger tables … … 273 281 } 274 282 } 283 275 284 if (best_item != NULL && distance(value, id)<distance(value, best_item->id)) 276 285 return NULL; 277 286 return best_item; 278 287 } 288 289 std::vector<const item*> get_next_2_hops( const nodeid_t& value) 290 { 291 ring_distance distance; 292 item* best_item = &own_id_item; 293 item* second_best_item = NULL; 294 295 // find best and second best item 296 for (size_t i=0; i<table.size(); i++) 297 { 298 item* curr = &table[i]; 299 300 // not not include orphans into routing! 301 if (curr->ref_count==0) continue; 302 303 // check if we found a better item 304 // is best item 305 if ( distance(value, curr->id) < distance(value, best_item->id) ) 306 { 307 second_best_item = best_item; 308 best_item = curr; 309 } 310 // is second best item 311 else 312 { 313 if ( second_best_item == NULL ) 314 { 315 second_best_item = curr; 316 continue; 317 } 318 319 if ( distance(value, curr->id) < distance(value, second_best_item->id) ) 320 { 321 second_best_item = curr; 322 } 323 } 324 } 325 326 // prepare return vector 327 std::vector<const item*> ret; 328 if ( best_item != NULL ) 329 { 330 ret.push_back(best_item); 331 } 332 if ( second_best_item != NULL ) 333 { 334 ret.push_back(second_best_item); 335 } 336 337 return ret; 338 } 279 339 280 340 const nodeid_t* get_successor() { 281 if (succ.size()== NULL) return NULL;341 if (succ.size()==0) return NULL; 282 342 return &succ.front(); 283 343 } 284 344 285 345 const nodeid_t* get_predesessor() { 286 if (pred.size()== NULL) return NULL;346 if (pred.size()==0) return NULL; 287 347 return &pred.front(); 288 348 } -
source/ariba/overlay/modules/chord/messages/CMakeLists.txt
r10700 r12060 37 37 # [License] 38 38 39 add_headers(Discovery.h)39 #add_headers(Discovery.h) 40 40 41 add_sources(Discovery.cpp)41 #add_sources(Discovery.cpp) -
source/ariba/overlay/modules/chord/messages/Discovery.h
r5870 r12060 59 59 using_serialization; 60 60 61 // XXX This whole message is DEPRECATED 61 62 class Discovery : public Message { 62 63 VSERIALIZEABLE; -
source/ariba/overlay/modules/onehop/CMakeLists.txt
r10700 r12060 37 37 # [License] 38 38 39 add_headers(OneHop.h)40 39 41 add_sources(OneHop.cpp) 40 ## DEPRECATED 41 #add_headers(OneHop.h) 42 42 43 add_subdir_sources(messages) 43 #add_sources(OneHop.cpp) 44 45 #add_subdir_sources(messages)
Note:
See TracChangeset
for help on using the changeset viewer.