Changeset 5870 for source/ariba/overlay/modules
- Timestamp:
- Aug 11, 2009, 4:11:02 PM (15 years ago)
- Location:
- source/ariba/overlay/modules
- Files:
-
- 2 deleted
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/overlay/modules/OverlayInterface.h
r5624 r5870 74 74 * Constructs a new overlay. 75 75 */ 76 OverlayInterface( 77 BaseOverlay& _baseoverlay, 78 const NodeID& _nodeid, 79 OverlayStructureEvents* _eventsReceiver, 80 OverlayParameterSet _parameters 81 ); 76 OverlayInterface( BaseOverlay& _baseoverlay, const NodeID& _nodeid, 77 OverlayStructureEvents* _eventsReceiver, OverlayParameterSet _parameters 78 ); 82 79 83 80 /** … … 103 100 * end-point, if this node is the initiator 104 101 */ 105 virtual void joinOverlay(const EndpointDescriptor& bootstrap = EndpointDescriptor::UNSPECIFIED()) = 0; 102 virtual void joinOverlay( 103 const EndpointDescriptor& bootstrap = EndpointDescriptor::UNSPECIFIED()) = 0; 106 104 107 105 /** … … 117 115 */ 118 116 virtual const EndpointDescriptor& resolveNode(const NodeID& node) = 0; 119 120 /**121 * Routes a message to a given node by using overlay routing.122 *123 * @param destnode The destination node.124 * @param msg The message to be routed.125 */126 virtual void routeMessage(const NodeID& destnode, Message* msg) = 0;127 128 /**129 * Routes a message to a given node by using an existing link.130 *131 * TODO: This is a hack. This method allows the BaseOverlay class to132 * use overlay signaling links to transfer data for relaying133 *134 * @param node The destination node.135 * @param link An established link136 * @param msg The message to be sent.137 */138 virtual void routeMessage(const NodeID& node, const LinkID& link, Message* msg) = 0;139 117 140 118 /** … … 183 161 const LinkID& lnk = LinkID::UNSPECIFIED); 184 162 185 186 163 const OverlayParameterSet& getParameters() const; 187 164 -
source/ariba/overlay/modules/chord/Chord.cpp
r5803 r5870 38 38 39 39 #include "ariba/overlay/BaseOverlay.h" 40 #include "ariba/overlay/messages/OverlayMsg.h" 40 41 41 42 #include "Chord.h" 42 #include "messages/ChordMessage.h" 43 #include "detail/chord_routing_table.hpp" 44 43 45 #include "messages/Discovery.h" 44 45 #include "detail/chord_routing_table.hpp"46 46 47 47 namespace ariba { 48 48 namespace overlay { 49 50 enum signalMessageTypes { 51 typeDiscovery = OverlayMsg::typeSignalingStart + 0x01, 52 typeLeave = OverlayMsg::typeSignalingStart + 0x02, 53 }; 49 54 50 55 typedef chord_routing_table::item route_item; … … 57 62 58 63 // create routing table 64 <<<<<<< .working 59 65 this->table = new chord_routing_table(_nodeid, 2); 66 ======= 67 this->table = new chord_routing_table(_nodeid, 4); 68 >>>>>>> .merge-rechts.r5869 60 69 orphan_removal_counter = 0; 61 70 discovery_count = 0; … … 71 80 72 81 /// helper: sets up a link using the base overlay 73 LinkID Chord::setup(const EndpointDescriptor& endp, const NodeID& node, const NodeID& remoteRelay ) { 74 logging_debug("Request to setup link to " << endp.toString() ); 82 LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) { 75 83 76 84 // check if we already have a connection 77 85 for (int i=0; i<table->size(); i++) 78 if ((*table)[i]-> id == node && !((*table)[i]->info.isUnspecified()))86 if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified())) 79 87 return LinkID::UNSPECIFIED; 80 88 81 89 // check if we are already trying to establish a link 82 90 for (size_t i=0; i<pending.size(); i++) 83 if ( pending[i] == node ) { 84 logging_debug("Already trying to establish a link to node " << node.toString() ); 91 if ( pending[i] == remote ) { 92 logging_debug("Already trying to establish a link to node " 93 << remote.toString() ); 85 94 return LinkID::UNSPECIFIED; 86 95 } 87 96 88 97 // adding node to list of pending connections 89 pending.push_back( node ); 98 pending.push_back( remote ); 99 100 logging_info("Request to setup link to " << endpoint.toString() ); 90 101 91 102 // establish link via base overlay 103 <<<<<<< .working 92 104 return baseoverlay.establishLink(endp, node, OverlayInterface::OVERLAY_SERVICE_ID, remoteRelay ); 105 ======= 106 return baseoverlay.establishLink( endpoint, remote, 107 OverlayInterface::OVERLAY_SERVICE_ID ); 108 >>>>>>> .merge-rechts.r5869 93 109 } 94 110 95 111 /// helper: sends a message using the "base overlay" 96 seqnum_t Chord::send( Message* msg, const LinkID& link) {112 seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) { 97 113 if (link.isUnspecified()) return 0; 98 return baseoverlay.sendMessage(msg, link); 114 msg->setRelayed(true); 115 return baseoverlay.send_link( msg, link ); 99 116 } 100 117 101 118 /// sends a discovery message 102 void Chord::send_discovery_to(const NodeID& destination, int ttl) { 119 void Chord::send_discovery_to(const NodeID& remote, int ttl) { 120 LinkID link = getNextLinkId(remote); 121 if ( remote == nodeid || link.isUnspecified()) return; 103 122 if ( table->size() == 0 ) return; 104 123 105 ChordMessage cmsg(ChordMessage::discovery, nodeid, destination); 106 Discovery dmsg; 107 dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor()); 108 dmsg.setSourceRelay(baseoverlay.getRelayNode(destination)); 109 dmsg.setFollowType(Discovery::normal); 110 dmsg.setTTL((uint8_t) ttl); 111 cmsg.encapsulate(&dmsg); 112 124 OverlayMsg msg( typeDiscovery ); 125 msg.setRelayed(true); 126 Discovery dmsg( Discovery::normal, (uint8_t)2, baseoverlay.getEndpointDescriptor() ); 127 msg.encapsulate(&dmsg); 128 129 <<<<<<< .working 113 130 // get next hop 114 131 const route_item* item = table->get_next_hop(destination); 115 132 if (item!=NULL && !item->info.isUnspecified()) send(&cmsg, item->info); 116 } 117 118 void Chord::discover_neighbors( const LinkID& lnk ) { 133 ======= 134 // send to node 135 baseoverlay.send_node( &msg, remote ); 136 >>>>>>> .merge-rechts.r5869 137 } 138 139 void Chord::discover_neighbors( const LinkID& link ) { 140 uint8_t ttl = 2; 141 { 142 // send predecessor discovery 143 OverlayMsg msg( typeDiscovery ); 144 msg.setRelayed(true); 145 Discovery dmsg( Discovery::predecessor, ttl, 146 baseoverlay.getEndpointDescriptor() ); 147 msg.encapsulate(&dmsg); 148 send(&msg, link); 149 } 119 150 { 120 151 // send successor discovery 121 ChordMessage cmsg(ChordMessage::discovery, nodeid, nodeid); 122 Discovery dmsg; 123 dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor()); 124 dmsg.setSourceRelay(baseoverlay.getRelayNode(nodeid)); 125 dmsg.setFollowType(Discovery::successor); 126 dmsg.setTTL((uint8_t)4); 127 cmsg.encapsulate(&dmsg); 128 send(&cmsg, lnk); 129 } 130 { 131 // send predecessor discovery 132 ChordMessage cmsg(ChordMessage::discovery, nodeid, nodeid); 133 Discovery dmsg; 134 dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor()); 135 dmsg.setSourceRelay(baseoverlay.getRelayNode(nodeid)); 136 dmsg.setFollowType(Discovery::predecessor); 137 dmsg.setTTL((uint8_t)4); 138 cmsg.encapsulate(&dmsg); 139 send(&cmsg, lnk); 152 OverlayMsg msg( typeDiscovery ); 153 msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); 154 msg.setRelayed(true); 155 Discovery dmsg( Discovery::successor, ttl, 156 baseoverlay.getEndpointDescriptor() ); 157 msg.encapsulate(&dmsg); 158 send(&msg, link); 140 159 } 141 160 } … … 166 185 for (size_t i = 0; i < table->size(); i++) { 167 186 route_item* it = (*table)[i]; 168 ChordMessage msg(ChordMessage::leave, nodeid, it->id); 169 send(&msg,it->info); 170 } 171 } 172 187 OverlayMsg msg( typeLeave ); 188 send( &msg, it->info ); 189 } 190 } 191 192 /// @see OverlayInterface.h 173 193 const EndpointDescriptor& Chord::resolveNode(const NodeID& node) { 174 194 const route_item* item = table->get(node); … … 177 197 } 178 198 179 void Chord::routeMessage(const NodeID& destnode, Message* msg) { 199 /// @see OverlayInterface.h 200 const LinkID& Chord::getNextLinkId( const NodeID& id ) const { 180 201 // get next hop 202 <<<<<<< .working 181 203 const route_item* item = table->get_next_hop(destnode); 182 204 ======= 205 const route_item* item = table->get_next_hop(id); 206 >>>>>>> .merge-rechts.r5869 207 208 <<<<<<< .working 183 209 // message for this node? yes-> delegate to base overlay 184 210 if (item->id == nodeid || destnode == nodeid) 185 211 baseoverlay.incomingRouteMessage( msg, LinkID::UNSPECIFIED, nodeid ); 186 212 ======= 213 // returns a unspecified id when this is itself 214 if (item == NULL || item->id == nodeid) 215 return LinkID::UNSPECIFIED; 216 >>>>>>> .merge-rechts.r5869 217 218 <<<<<<< .working 187 219 else { // no-> send to next hop 188 220 ChordMessage cmsg(ChordMessage::route, nodeid, destnode); … … 190 222 send(&cmsg, item->info); 191 223 } 192 } 193 224 ======= 225 /// return routing info 226 return item->info; 227 >>>>>>> .merge-rechts.r5869 228 } 229 230 <<<<<<< .working 194 231 /// @see OverlayInterface.h 195 232 void Chord::routeMessage(const NodeID& node, const LinkID& link, Message* msg) { … … 214 251 } 215 252 253 ======= 254 >>>>>>> .merge-rechts.r5869 216 255 OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const { 217 256 OverlayInterface::NodeList nodelist; … … 243 282 /// @see OverlayInterface.h 244 283 void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) { 245 logging_ debug("link_up: link=" << lnk.toString() << " remote=" <<284 logging_info("link_up: link=" << lnk.toString() << " remote=" << 246 285 remote.toString() ); 247 286 for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++) … … 250 289 break; 251 290 } 291 /* 292 // check if we already have a connection, yes-> do not handle duplicate! 293 for (int i=0; i<table->size(); i++) 294 if ((*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()) && (*table)[i]->info != lnk) { 295 296 return; 297 } 298 */ 299 if (remote==nodeid) { 300 baseoverlay.dropLink(lnk); 301 return; 302 } 303 252 304 route_item* item = table->insert(remote); 253 305 … … 257 309 << " with link " << lnk.toString()); 258 310 // replace with new link 259 if (!item->info.isUnspecified() )311 if (!item->info.isUnspecified() || item->info!=lnk) 260 312 baseoverlay.dropLink(item->info); 261 313 item->info = lnk; 314 // discover neighbors of new overlay neighbor 315 discover_neighbors( lnk ); 316 showLinks(); 262 317 } else { // no-> add orphan entry to routing table 263 318 logging_info("new orphan: " << remote.toString() … … 266 321 } 267 322 268 discover_neighbors( lnk ); 269 323 // erase bootstrap link 270 324 vector<LinkID>::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk); 271 if( it != bootstrapLinks.end() ) { 272 discover_neighbors( lnk ); 273 bootstrapLinks.erase( it ); 274 } 325 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); 275 326 } 276 327 … … 282 333 // remove link from routing table 283 334 route_item* item = table->get(remote); 284 if (item!=NULL) item->info = LinkID::UNSPECIFIED; 285 table->remove(remote); 335 if (item!=NULL && item->info==lnk) { 336 item->info = LinkID::UNSPECIFIED; 337 table->remove(remote); 338 } 286 339 } 287 340 … … 292 345 293 346 // decode message 294 typedef ChordMessage M; 295 M* m = msg.getMessage()->convert<ChordMessage> (); 347 OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage()); 296 348 if (m == NULL) return; 297 349 … … 299 351 switch (m->getType()) { 300 352 353 <<<<<<< .working 301 354 // invalid message 302 355 case M::invalid: … … 324 377 // discovery request 325 378 case M::discovery: { 379 ======= 380 // discovery request 381 case typeDiscovery: { 382 >>>>>>> .merge-rechts.r5869 326 383 // decapsulate message 327 384 Discovery* dmsg = m->decapsulate<Discovery> (); 328 logging_debug(" received discovery message with"329 << " src=" << m->getSource ().toString()330 << " dst=" << m->getDestination ().toString()385 logging_debug("Received discovery message with" 386 << " src=" << m->getSourceNode().toString() 387 << " dst=" << m->getDestinationNode().toString() 331 388 << " ttl=" << (int)dmsg->getTTL() 332 << " type=" << (int)dmsg->get FollowType()389 << " type=" << (int)dmsg->getType() 333 390 ); 334 391 335 392 // check if source node can be added to routing table and setup link 336 if (m->getSource() != nodeid && table->is_insertable(m->getSource())) 337 setup(*dmsg->getSourceEndpoint(), m->getSource(), dmsg->getSourceRelay() ); 393 if (m->getSourceNode() != nodeid 394 && table->is_insertable(m->getSourceNode())) 395 setup( dmsg->getEndpoint(), m->getSourceNode() ); 338 396 339 397 // delegate discovery message 340 switch (dmsg->get FollowType()) {398 switch (dmsg->getType()) { 341 399 342 400 // normal: route discovery message like every other message 343 401 case Discovery::normal: { 344 402 // closest node? yes-> split to follow successor and predecessor 345 if (table->is_closest_to(m->getDestination()) 346 || (table->get_successor()!=NULL && *table->get_successor() == m->getDestination()) 347 || (table->get_predesessor()!=NULL && *table->get_predesessor() == m->getDestination()) 348 ) { 403 if ( table->is_closest_to(m->getDestinationNode()) ) { 349 404 350 405 if (table->get_successor() != NULL) { 351 // send successor message 352 ChordMessage cmsg_s(*m); 353 Discovery dmsg_s(*dmsg); 354 dmsg_s.setFollowType(Discovery::successor); 355 cmsg_s.encapsulate(&dmsg_s); 406 OverlayMsg omsg(*m); 407 dmsg->setType(Discovery::successor); 408 omsg.encapsulate(dmsg); 356 409 route_item* succ_item = table->get(*table->get_successor()); 357 410 logging_debug("Discovery split: routing discovery message to successor " 358 411 << succ_item->id.toString() ); 412 <<<<<<< .working 359 413 send(&cmsg_s, succ_item->info); 414 ======= 415 send(&omsg, succ_item->info); 416 >>>>>>> .merge-rechts.r5869 360 417 } 361 418 362 419 // send predecessor message 363 420 if (table->get_predesessor() != NULL) { 364 ChordMessage cmsg_p(*m); 365 Discovery dmsg_p(*dmsg); 366 dmsg_p.setFollowType(Discovery::predecessor); 367 cmsg_p.encapsulate(&dmsg_p); 421 OverlayMsg omsg(*m); 422 dmsg->setType(Discovery::predecessor); 423 omsg.encapsulate(dmsg); 368 424 route_item* pred_item = table->get( 369 425 *table->get_predesessor()); 370 426 logging_debug("Discovery split: routing discovery message to predecessor " 371 427 << pred_item->id.toString() ); 428 <<<<<<< .working 372 429 send(&cmsg_p, pred_item->info); 430 ======= 431 send( &omsg, pred_item->info); 432 >>>>>>> .merge-rechts.r5869 373 433 } 374 434 } 375 435 // no-> route message 376 436 else { 437 <<<<<<< .working 377 438 // find next hop 378 439 const route_item* item = table->get_next_hop(m->getDestination()); … … 381 442 item->id.toString() ); 382 443 send(m, item->info); 444 ======= 445 baseoverlay.send( m, m->getDestinationNode() ); 446 >>>>>>> .merge-rechts.r5869 383 447 } 384 448 break; … … 395 459 396 460 const route_item* item = NULL; 397 if (dmsg->get FollowType() == Discovery::successor &&461 if (dmsg->getType() == Discovery::successor && 398 462 table->get_successor() != NULL) { 399 463 item = table->get(*table->get_successor()); … … 405 469 logging_debug("routing discovery message to succ/pred " 406 470 << item->id.toString() ); 407 ChordMessage cmsg(*m);408 Discovery dmsg_p(*dmsg);409 cmsg.encapsulate(&dmsg_p);410 send(&cmsg, item->info);471 OverlayMsg omsg(*m); 472 omsg.encapsulate(dmsg); 473 omsg.setDestinationNode(item->id); 474 baseoverlay.send(&omsg, omsg.getDestinationNode()); 411 475 break; 412 476 }} … … 415 479 } 416 480 417 418 case M::leave: {481 // leave 482 case typeLeave: { 419 483 if (link!=LinkID::UNSPECIFIED) { 420 484 route_item* item = table->get(remote); … … 424 488 } 425 489 break; 426 } 427 } 428 delete m; 490 }} 429 491 } 430 492 … … 464 526 // remove orphan links 465 527 orphan_removal_counter++; 528 <<<<<<< .working 466 529 if (orphan_removal_counter <0 || orphan_removal_counter >= 4) { 530 ======= 531 if (orphan_removal_counter <0 || orphan_removal_counter >= 2) { 532 >>>>>>> .merge-rechts.r5869 467 533 logging_info("Running orphan removal"); 468 534 orphan_removal_counter = 0; … … 473 539 table->insert(it->id); 474 540 if (it->ref_count==0) { 475 baseoverlay.dropLink(it->info);541 LinkID id = it->info; 476 542 it->info = LinkID::UNSPECIFIED; 543 baseoverlay.dropLink(id); 477 544 } 478 545 } … … 480 547 } 481 548 } 549 } 550 551 void Chord::showLinks() { 482 552 logging_info("--- chord routing information ----------------------------------"); 483 553 logging_info("predecessor: " << (table->get_predesessor()==NULL? "<none>" : -
source/ariba/overlay/modules/chord/Chord.h
r5803 r5870 51 51 namespace overlay { 52 52 53 class OverlayMsg; 54 53 55 using ariba::communication::EndpointDescriptor; 54 56 using ariba::utility::Timer; … … 82 84 // helper: sets up a link using the "base overlay" 83 85 LinkID setup( const EndpointDescriptor& endp, 84 const NodeID& node = NodeID::UNSPECIFIED, 85 const NodeID& remoteRelay = NodeID::UNSPECIFIED ); 86 const NodeID& node = NodeID::UNSPECIFIED ); 86 87 87 88 // helper: sends a message using the "base overlay" 88 seqnum_t send( Message* msg, const LinkID& link );89 seqnum_t send( OverlayMsg* msg, const LinkID& link ); 89 90 90 91 // stabilization: sends a discovery message to the specified neighborhood … … 92 93 93 94 void discover_neighbors( const LinkID& lnk ); 95 96 void showLinks(); 94 97 95 98 public: … … 119 122 120 123 /// @see OverlayInterface.h 121 virtual void routeMessage( const NodeID& destnode, Message* msg );122 123 /// @see OverlayInterface.h124 virtual void routeMessage(const NodeID& node, const LinkID& link, Message* msg);125 126 /// @see OverlayInterface.h127 124 virtual NodeList getKnownNodes(bool deep = true) const; 128 125 … … 137 134 const LinkID& lnk = LinkID::UNSPECIFIED); 138 135 139 140 141 136 /// @see Timer.h 142 137 virtual void eventFunction(); 138 143 139 }; 144 140 -
source/ariba/overlay/modules/chord/messages/Discovery.cpp
r5681 r5870 44 44 vsznDefault(Discovery); 45 45 46 Discovery::Discovery(){47 }48 49 46 Discovery::~Discovery(){ 50 47 } -
source/ariba/overlay/modules/chord/messages/Discovery.h
r5744 r5870 41 41 42 42 #include <vector> 43 43 44 #include "ariba/utility/messages.h" 44 45 #include "ariba/utility/serialization.h" … … 61 62 VSERIALIZEABLE; 62 63 public: 63 enum follow_type_ { 64 normal = 0, 65 successor = 1, 66 predecessor = 2 64 enum type_ { 65 invalid = 0, 66 normal = 1, 67 successor = 2, 68 predecessor = 3 67 69 }; 68 70 69 Discovery( const Discovery& msg ) { 70 this->follow_type = msg.follow_type; 71 this->ttl = msg.ttl; 72 this->source_relay = msg.source_relay; 73 this->source_endpoint = msg.source_endpoint; 71 Discovery( const Discovery& msg ) : type(msg.type), ttl(msg.ttl), 72 endpoint(msg.endpoint) { 74 73 } 75 explicit Discovery(); 74 Discovery( type_ type = invalid, uint8_t ttl = 4, 75 const EndpointDescriptor& endpoint = EndpointDescriptor::UNSPECIFIED() ) 76 : type(type), ttl(ttl), endpoint(endpoint) { 77 } 76 78 virtual ~Discovery(); 77 79 78 const EndpointDescriptor* getSourceEndpoint() const {79 return &source_endpoint;80 inline type_ getType() const { 81 return (type_)type; 80 82 } 81 83 82 void setSourceEndpoint( const EndpointDescriptor* endpoint) {83 source_endpoint = *endpoint;84 inline void setType( type_ type ) { 85 this->type = type; 84 86 } 85 87 … … 92 94 } 93 95 94 inline follow_type_ getFollowType() const {95 return (follow_type_)follow_type;96 inline const EndpointDescriptor& getEndpoint() const { 97 return endpoint; 96 98 } 97 99 98 inline void set FollowType( follow_type_ type) {99 follow_type = (uint8_t)type;100 inline void setEndpoint( const EndpointDescriptor& endpoint ) { 101 this->endpoint = endpoint; 100 102 } 101 103 102 inline void setSourceRelay( const NodeID& relay ) {103 source_relay = relay;104 }105 106 inline const NodeID& getSourceRelay() const {107 return source_relay;108 }109 104 private: 110 uint8_t follow_type;105 uint8_t type; 111 106 uint8_t ttl; 112 EndpointDescriptor source_endpoint; 113 NodeID source_relay; 107 EndpointDescriptor endpoint; 114 108 }; 115 109 … … 117 111 118 112 sznBeginDefault( ariba::overlay::Discovery, X ) { 119 /// serialize follow-type and time-to-live 120 X && follow_type && ttl; 121 122 // serialize end-point 123 X && &source_relay && source_endpoint; 113 X && type && ttl && endpoint; 124 114 } sznEnd(); 125 115
Note:
See TracChangeset
for help on using the changeset viewer.