Changeset 12060 for source/ariba/overlay/modules/chord
- Timestamp:
- Jun 19, 2013, 11:05:49 AM (11 years ago)
- Location:
- source/ariba/overlay/modules/chord
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/overlay/modules/chord/Chord.cpp
r10572 r12060 43 43 #include "detail/chord_routing_table.hpp" 44 44 45 #include "messages/Discovery.h" 45 //#include "messages/Discovery.h" // XXX DEPRECATED 46 46 47 47 namespace ariba { … … 55 55 typedef chord_routing_table::item route_item; 56 56 57 using ariba::transport::system_priority; 58 57 59 use_logging_cpp( Chord ); 60 61 62 ////// Messages 63 struct DiscoveryMessage 64 { 65 /** 66 * DiscoveryMessage 67 * - type 68 * - data 69 * - Endpoint 70 */ 71 72 // type enum 73 enum type_ { 74 invalid = 0, 75 normal = 1, 76 successor = 2, 77 predecessor = 3 78 }; 79 80 81 // data 82 uint8_t type; 83 uint8_t ttl; 84 EndpointDescriptor endpoint; 85 86 // serialize 87 reboost::message_t serialize() 88 { 89 // serialize endpoint 90 reboost::message_t msg = endpoint.serialize(); 91 92 // serialize type and ttl 93 uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data(); 94 buff1[0] = type; 95 buff1[1] = ttl; 96 97 return msg; 98 } 99 100 //deserialize 101 reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff) 102 { 103 // deserialize type and ttl 104 const uint8_t* bytes = buff.data(); 105 type = bytes[0]; 106 ttl = bytes[1]; 107 108 // deserialize endpoint 109 return endpoint.deserialize(buff(2*sizeof(uint8_t))); 110 } 111 }; 112 58 113 59 114 Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, … … 102 157 103 158 /// helper: sends a message using the "base overlay" 104 seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) { 105 if (link.isUnspecified()) return 0; 106 return baseoverlay.send_link( msg, link ); 159 void Chord::send( OverlayMsg* msg, const LinkID& link ) { 160 if (link.isUnspecified()) 161 return; 162 163 baseoverlay.send_link( msg, link, system_priority::OVERLAY ); 164 } 165 166 void Chord::send_node( OverlayMsg* message, const NodeID& remote ) 167 { 168 try 169 { 170 baseoverlay.send( message, remote, system_priority::OVERLAY ); 171 } 172 catch ( message_not_sent& e ) 173 { 174 logging_warn("Chord: Could not send message to " << remote 175 << ": " << e.what()); 176 } 107 177 } 108 178 … … 116 186 OverlayMsg msg( typeDiscovery ); 117 187 msg.setRegisterRelay(true); 118 Discovery dmsg( Discovery::normal, (uint8_t)ttl, baseoverlay.getEndpointDescriptor() ); 119 msg.encapsulate(&dmsg); 188 189 // create DiscoveryMessage 190 DiscoveryMessage dmsg; 191 dmsg.type = DiscoveryMessage::normal; 192 dmsg.ttl = ttl; 193 dmsg.endpoint = baseoverlay.getEndpointDescriptor(); 194 195 msg.set_payload_message(dmsg.serialize()); 120 196 121 197 // send to node 122 baseoverlay.send_node( &msg, remote ); 198 try 199 { 200 baseoverlay.send_node( &msg, remote, system_priority::OVERLAY ); 201 } 202 catch ( message_not_sent& e ) 203 { 204 logging_warn("Chord: Could not send message to " << remote 205 << ": " << e.what()); 206 } 123 207 } 124 208 125 209 void Chord::discover_neighbors( const LinkID& link ) { 126 210 uint8_t ttl = 1; 211 212 // FIXME try-catch for the send operations 213 214 // create DiscoveryMessage 215 DiscoveryMessage dmsg; 216 dmsg.ttl = ttl; 217 dmsg.endpoint = baseoverlay.getEndpointDescriptor(); 127 218 { 128 219 // send predecessor discovery 129 220 OverlayMsg msg( typeDiscovery ); 130 221 msg.setRegisterRelay(true); 131 Discovery dmsg( Discovery::predecessor, ttl, 132 baseoverlay.getEndpointDescriptor() ); 133 msg.encapsulate(&dmsg); 222 223 // set type 224 dmsg.type = DiscoveryMessage::predecessor; 225 226 // send 227 msg.set_payload_message(dmsg.serialize()); 134 228 send(&msg, link); 135 229 } … … 137 231 // send successor discovery 138 232 OverlayMsg msg( typeDiscovery ); 139 msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); 233 // msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); // XXX this was redundand, wasn't it? 140 234 msg.setRegisterRelay(true); 141 Discovery dmsg( Discovery::successor, ttl, 142 baseoverlay.getEndpointDescriptor() ); 143 msg.encapsulate(&dmsg); 235 236 // set type 237 dmsg.type = DiscoveryMessage::successor; 238 239 // send 240 msg.set_payload_message(dmsg.serialize()); 144 241 send(&msg, link); 145 242 } … … 163 260 164 261 // timer for stabilization management 165 Timer::setInterval(1000); 262 // Timer::setInterval(1000); // TODO find an appropriate interval! 263 Timer::setInterval(10000); // XXX testing... 166 264 Timer::start(); 167 265 } … … 200 298 return item->info; 201 299 } 300 301 std::vector<const LinkID*> Chord::getSortedLinkIdsTowardsNode( 302 const NodeID& id, int num ) const 303 { 304 std::vector<const LinkID*> ret; 305 306 switch ( num ) 307 { 308 // special case: just call »getNextLinkId« 309 case 1: 310 { 311 ret.push_back(&getNextLinkId(id)); 312 313 break; 314 } 315 316 // * calculate top 2 * 317 case 0: 318 case 2: 319 { 320 std::vector<const route_item*> items = table->get_next_2_hops(id); 321 322 ret.reserve(items.size()); 323 324 BOOST_FOREACH( const route_item* item, items ) 325 { 326 ret.push_back(&item->info); 327 } 328 329 break; 330 } 331 332 // NOTE: implement real sorting, if needed (and handle "case 0" properly, then) 333 default: 334 { 335 throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)"); 336 337 break; 338 } 339 } 340 341 return ret; 342 } 343 202 344 203 345 /// @see OverlayInterface.h … … 253 395 if (remote==nodeid) { 254 396 logging_warn("dropping link that has been established to myself (nodes have same nodeid?)"); 397 logging_warn("NodeID: " << remote); 255 398 baseoverlay.dropLink(lnk); 256 399 return; … … 290 433 /// @see CommunicationListener.h or @see OverlayInterface.h 291 434 void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { 292 logging_debug("link_down: link=" << lnk.toString() << " remote=" << 435 // XXX logging_debug 436 logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" << 293 437 remote.toString() ); 294 438 … … 303 447 /// @see CommunicationListener.h 304 448 /// @see OverlayInterface.h 305 void Chord::onMessage(const DataMessage& msg, const NodeID& remote, 449 void Chord::onMessage(OverlayMsg* msg, 450 reboost::shared_buffer_t sub_msg, 451 const NodeID& remote, 306 452 const LinkID& link) { 307 453 308 // decode message309 OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage());310 if (m == NULL) return;311 312 454 // handle messages 313 switch ((signalMessageTypes) m->getType()) {455 switch ((signalMessageTypes) msg->getType()) { 314 456 315 457 // discovery request 316 case typeDiscovery: { 317 // decapsulate message 318 Discovery* dmsg = m->decapsulate<Discovery> (); 458 case typeDiscovery: 459 { 460 // deserialize discovery message 461 DiscoveryMessage dmsg; 462 dmsg.deserialize(sub_msg); 463 319 464 logging_debug("Received discovery message with" 320 << " src=" << m ->getSourceNode().toString()321 << " dst=" << m ->getDestinationNode().toString()322 << " ttl=" << (int)dmsg ->getTTL()323 << " type=" << (int)dmsg ->getType()465 << " src=" << msg->getSourceNode().toString() 466 << " dst=" << msg->getDestinationNode().toString() 467 << " ttl=" << (int)dmsg.ttl 468 << " type=" << (int)dmsg.type 324 469 ); 325 470 … … 327 472 bool found = false; 328 473 BOOST_FOREACH( NodeID& value, discovery ) 329 if (value == m ->getSourceNode()) {474 if (value == msg->getSourceNode()) { 330 475 found = true; 331 476 break; 332 477 } 333 if (!found) discovery.push_back(m ->getSourceNode());478 if (!found) discovery.push_back(msg->getSourceNode()); 334 479 335 480 // check if source node can be added to routing table and setup link 336 if (m ->getSourceNode() != nodeid)337 setup( dmsg ->getEndpoint(), m->getSourceNode() );481 if (msg->getSourceNode() != nodeid) 482 setup( dmsg.endpoint, msg->getSourceNode() ); 338 483 339 484 // process discovery message -------------------------- switch start -- 340 switch (dmsg->getType()) { 341 342 // normal: route discovery message like every other message 343 case Discovery::normal: { 344 // closest node? yes-> split to follow successor and predecessor 345 if ( table->is_closest_to(m->getDestinationNode()) ) { 346 logging_debug("Discovery split:"); 347 if (!table->get_successor()->isUnspecified()) { 348 OverlayMsg omsg(*m); 349 dmsg->setType(Discovery::successor); 350 omsg.encapsulate(dmsg); 351 logging_debug("* Routing to successor " 352 << table->get_successor()->toString() ); 353 baseoverlay.send( &omsg, *table->get_successor() ); 354 } 355 356 // send predecessor message 357 if (!table->get_predesessor()->isUnspecified()) { 358 OverlayMsg omsg(*m); 359 dmsg->setType(Discovery::predecessor); 360 omsg.encapsulate(dmsg); 361 logging_debug("* Routing to predecessor " 362 << table->get_predesessor()->toString() ); 363 baseoverlay.send( &omsg, *table->get_predesessor() ); 364 } 365 } 366 // no-> route message 367 else { 368 baseoverlay.route( m ); 369 } 370 break; 371 } 372 373 // successor mode: follow the successor until TTL is zero 374 case Discovery::successor: 375 case Discovery::predecessor: { 376 // reached destination? no->forward! 377 if (m->getDestinationNode() != nodeid) { 378 OverlayMsg omsg(*m); 379 omsg.encapsulate(dmsg); 380 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 381 baseoverlay.route( &omsg ); 382 break; 383 } 384 385 // time to live ended? yes-> stop routing 386 if (dmsg->getTTL() == 0 || dmsg->getTTL() > 10) break; 387 388 // decrease time-to-live 389 dmsg->setTTL(dmsg->getTTL() - 1); 390 391 const route_item* item = NULL; 392 if (dmsg->getType() == Discovery::successor && 393 table->get_successor() != NULL) { 394 item = table->get(*table->get_successor()); 395 } else { 396 if (table->get_predesessor()!=NULL) 397 item = table->get(*table->get_predesessor()); 398 } 399 if (item == NULL) 400 break; 401 402 logging_debug("Routing discovery message to succ/pred " 403 << item->id.toString() ); 404 OverlayMsg omsg(*m); 405 omsg.encapsulate(dmsg); 406 omsg.setDestinationNode(item->id); 407 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 408 baseoverlay.send(&omsg, omsg.getDestinationNode()); 409 break; 410 } 411 case Discovery::invalid: 412 break; 413 414 default: 415 break; 416 } 417 // process discovery message ---------------------------- switch end -- 418 419 delete dmsg; 420 break; 421 } 422 423 // leave 424 case typeLeave: { 425 if (link!=LinkID::UNSPECIFIED) { 426 route_item* item = table->get(remote); 427 if (item!=NULL) item->info = LinkID::UNSPECIFIED; 428 table->remove(remote); 429 baseoverlay.dropLink(link); 430 } 431 break; 432 }} 485 switch ( dmsg.type ) 486 { 487 // normal: route discovery message like every other message 488 case DiscoveryMessage::normal: 489 { 490 // closest node? yes-> split to follow successor and predecessor 491 if ( table->is_closest_to(msg->getDestinationNode()) ) 492 { 493 logging_debug("Discovery split:"); 494 if (!table->get_successor()->isUnspecified()) 495 { 496 OverlayMsg omsg(*msg); 497 498 dmsg.type = DiscoveryMessage::successor; 499 omsg.set_payload_message(dmsg.serialize()); 500 501 logging_debug("* Routing to successor " 502 << table->get_successor()->toString() ); 503 send_node( &omsg, *table->get_successor() ); 504 } 505 506 // send predecessor message 507 if (!table->get_predesessor()->isUnspecified()) 508 { 509 OverlayMsg omsg(*msg); 510 511 dmsg.type = DiscoveryMessage::predecessor; 512 omsg.set_payload_message(dmsg.serialize()); 513 514 logging_debug("* Routing to predecessor " 515 << table->get_predesessor()->toString() ); 516 send_node( &omsg, *table->get_predesessor() ); 517 } 518 } 519 // no-> route message 520 else 521 { 522 baseoverlay.route( msg ); 523 } 524 break; 525 } 526 527 // successor mode: follow the successor until TTL is zero 528 case DiscoveryMessage::successor: 529 case DiscoveryMessage::predecessor: 530 { 531 // reached destination? no->forward! 532 if (msg->getDestinationNode() != nodeid) 533 { 534 OverlayMsg omsg(*msg); 535 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 536 537 omsg.set_payload_message(dmsg.serialize()); 538 539 baseoverlay.route( &omsg ); 540 break; 541 } 542 543 // time to live ended? yes-> stop routing 544 if (dmsg.ttl == 0 || dmsg.ttl > 10) break; 545 546 // decrease time-to-live 547 dmsg.ttl--; 548 549 const route_item* item = NULL; 550 if (dmsg.type == DiscoveryMessage::successor && 551 table->get_successor() != NULL) 552 { 553 item = table->get(*table->get_successor()); 554 } 555 else if (table->get_predesessor() != NULL) 556 { 557 item = table->get(*table->get_predesessor()); 558 } 559 if (item == NULL) 560 break; 561 562 logging_debug("Routing discovery message to succ/pred " 563 << item->id.toString() ); 564 OverlayMsg omsg(*msg); 565 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 566 omsg.setDestinationNode(item->id); 567 568 omsg.set_payload_message(dmsg.serialize()); 569 570 send_node( &omsg, omsg.getDestinationNode() ); 571 break; 572 } 573 case DiscoveryMessage::invalid: 574 break; 575 576 default: 577 break; 578 } 579 // process discovery message ---------------------------- switch end -- 580 581 break; 582 } 583 584 // leave 585 case typeLeave: { 586 if (link!=LinkID::UNSPECIFIED) { 587 route_item* item = table->get(remote); 588 if (item!=NULL) item->info = LinkID::UNSPECIFIED; 589 table->remove(remote); 590 baseoverlay.dropLink(link); 591 } 592 break; 593 } 594 } 433 595 } 434 596 -
source/ariba/overlay/modules/chord/Chord.h
r10572 r12060 45 45 #include "../OverlayInterface.h" 46 46 #include <vector> 47 #include <stdexcept> 47 48 48 49 class chord_routing_table; … … 87 88 const NodeID& node = NodeID::UNSPECIFIED ); 88 89 89 // helper: sends a message using the "base overlay"90 seqnum_tsend( OverlayMsg* msg, const LinkID& link );90 // helper: sends a message over a link using the "base overlay" 91 void send( OverlayMsg* msg, const LinkID& link ); 91 92 93 // helper: sends a message to a node using the "base overlay" 94 void send_node( OverlayMsg* message, const NodeID& remote ); 95 92 96 // stabilization: sends a discovery message to the specified neighborhood 93 97 void send_discovery_to( const NodeID& destination, int ttl = 3 ); … … 105 109 virtual const LinkID& getNextLinkId( const NodeID& id ) const; 106 110 111 /// @see OverlayInterface.h 112 /// NOTE: This implementation excepts num == 2 113 virtual std::vector<const LinkID*> getSortedLinkIdsTowardsNode( 114 const NodeID& id, int num = 0 ) const; 115 107 116 /// @see OverlayInterface.h 108 117 virtual const NodeID& getNextNodeId( const NodeID& id ) const; … … 138 147 139 148 /// @see CommunicationListener.h or @see OverlayInterface.h 140 virtual void onMessage(const DataMessage& msg, const NodeID& remote, 149 virtual void onMessage(OverlayMsg* msg, 150 reboost::shared_buffer_t sub_msg, 151 const NodeID& remote, 141 152 const LinkID& lnk = LinkID::UNSPECIFIED); 142 153 -
source/ariba/overlay/modules/chord/detail/chord_routing_table.hpp
r8606 r12060 96 96 // the own node id 97 97 nodeid_t id; 98 99 // the own node id as routing item 100 item own_id_item; 98 101 99 102 // successor and predecessor tables … … 168 171 /// constructs the reactive chord routing table 169 172 explicit chord_routing_table( const nodeid_t& id, int redundancy = 4 ) : 170 id(id), succ( redundancy, succ_compare_type(this->id), *this ), 171 pred( redundancy, pred_compare_type(this->id), *this ) { 173 id(id), 174 succ( redundancy, succ_compare_type(this->id), *this ), 175 pred( redundancy, pred_compare_type(this->id), *this ) 176 { 177 // init reflexive item 178 own_id_item.id = id; 179 own_id_item.ref_count = 1; 172 180 173 181 // create finger tables … … 273 281 } 274 282 } 283 275 284 if (best_item != NULL && distance(value, id)<distance(value, best_item->id)) 276 285 return NULL; 277 286 return best_item; 278 287 } 288 289 std::vector<const item*> get_next_2_hops( const nodeid_t& value) 290 { 291 ring_distance distance; 292 item* best_item = &own_id_item; 293 item* second_best_item = NULL; 294 295 // find best and second best item 296 for (size_t i=0; i<table.size(); i++) 297 { 298 item* curr = &table[i]; 299 300 // not not include orphans into routing! 301 if (curr->ref_count==0) continue; 302 303 // check if we found a better item 304 // is best item 305 if ( distance(value, curr->id) < distance(value, best_item->id) ) 306 { 307 second_best_item = best_item; 308 best_item = curr; 309 } 310 // is second best item 311 else 312 { 313 if ( second_best_item == NULL ) 314 { 315 second_best_item = curr; 316 continue; 317 } 318 319 if ( distance(value, curr->id) < distance(value, second_best_item->id) ) 320 { 321 second_best_item = curr; 322 } 323 } 324 } 325 326 // prepare return vector 327 std::vector<const item*> ret; 328 if ( best_item != NULL ) 329 { 330 ret.push_back(best_item); 331 } 332 if ( second_best_item != NULL ) 333 { 334 ret.push_back(second_best_item); 335 } 336 337 return ret; 338 } 279 339 280 340 const nodeid_t* get_successor() { 281 if (succ.size()== NULL) return NULL;341 if (succ.size()==0) return NULL; 282 342 return &succ.front(); 283 343 } 284 344 285 345 const nodeid_t* get_predesessor() { 286 if (pred.size()== NULL) return NULL;346 if (pred.size()==0) return NULL; 287 347 return &pred.front(); 288 348 } -
source/ariba/overlay/modules/chord/messages/CMakeLists.txt
r10700 r12060 37 37 # [License] 38 38 39 add_headers(Discovery.h)39 #add_headers(Discovery.h) 40 40 41 add_sources(Discovery.cpp)41 #add_sources(Discovery.cpp) -
source/ariba/overlay/modules/chord/messages/Discovery.h
r5870 r12060 59 59 using_serialization; 60 60 61 // XXX This whole message is DEPRECATED 61 62 class Discovery : public Message { 62 63 VSERIALIZEABLE;
Note:
See TracChangeset
for help on using the changeset viewer.