Changeset 3690 for source/ariba/overlay/modules/chord
- Timestamp:
- May 26, 2009, 1:40:23 AM (16 years ago)
- Location:
- source/ariba/overlay/modules/chord
- Files:
-
- 10 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/overlay/modules/chord/Chord.cpp
r3689 r3690 1 // [Licen ce]1 // [License] 2 2 // The Ariba-Underlay Copyright 3 3 // … … 35 35 // official policies, either expressed or implied, of the Institute of 36 36 // Telematics. 37 // [Licence] 37 // [License] 38 39 #include "ariba/overlay/BaseOverlay.h" 38 40 39 41 #include "Chord.h" 42 #include "messages/ChordMessage.h" 43 #include "messages/Discovery.h" 44 45 #include "detail/chord_routing_table.hpp" 40 46 41 47 namespace ariba { 42 48 namespace overlay { 43 49 44 use_logging_cpp( OneHop ); 50 typedef chord_routing_table::item route_item; 51 use_logging_cpp( Chord ) 52 ; 45 53 46 54 Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, 47 55 OverlayStructureEvents* _eventsReceiver) : 48 OverlayInterface( _baseoverlay, _nodeid, _eventsReceiver ) { 56 OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver) { 57 58 // create routing table 59 this->table = new chord_routing_table(_nodeid, 2); 60 orphan_removal_counter = 0; 61 stabilize_counter = 0; 62 stabilize_finger = 0; 63 bootstrapLink = LinkID::UNSPECIFIED; 49 64 } 50 65 51 66 Chord::~Chord() { 67 68 // delete routing table 69 delete table; 70 } 71 72 // helper: sets up a link using the base overlay 73 LinkID Chord::setup(const EndpointDescriptor& endp) { 74 75 logging_debug("request to setup link to " << endp.toString() ); 76 // establish link via base overlay 77 return baseoverlay.establishLink(endp, OverlayInterface::OVERLAY_SERVICE_ID); 78 } 79 80 // helper: sends a message using the "base overlay" 81 seqnum_t Chord::send(Message* msg, const LinkID& link) { 82 return baseoverlay.sendMessage(msg, link); 83 } 84 85 void Chord::send_discovery_to(const NodeID& destination, int ttl) { 86 logging_debug("Initiating discovery of " << destination.toString() ); 87 Message msg; 88 ChordMessage cmsg(ChordMessage::discovery, nodeid, destination); 89 Discovery dmsg; 90 dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor()); 91 dmsg.setFollowType(Discovery::normal); 92 dmsg.setTTL((uint8_t) ttl); 93 cmsg.encapsulate(&dmsg); 94 msg.encapsulate(&cmsg); 95 logging_debug("" << (int)cmsg.getType()); 96 this->onMessage(&msg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); 52 97 } 53 98 54 99 void Chord::createOverlay() { 55 56 100 } 57 101 … … 60 104 } 61 105 62 void Chord::joinOverlay( 63 const EndpointDescriptor& bootstrapEp = EndpointDescriptor::UNSPECIFIED 64 ) { 65 106 void Chord::joinOverlay(const EndpointDescriptor& boot) { 107 logging_info( "joining Chord overlay structure through end-point " << 108 (boot == EndpointDescriptor::UNSPECIFIED ? 109 "local" : boot.toString()) ); 110 111 // initiator? no->setup first link 112 if (!(boot == EndpointDescriptor::UNSPECIFIED)) bootstrapLink = setup(boot); 113 114 // timer for stabilization management 115 Timer::setInterval(2500); 116 Timer::start(); 66 117 } 67 118 68 119 void Chord::leaveOverlay() { 69 70 } 71 72 const EndpointDescriptor& Chord::resolveNode( const NodeID& node ) { 73 74 } 75 76 void Chord::routeMessage( const NodeID& destnode, Message* msg ) { 77 78 } 79 80 NodeList Chord::getKnownNodes() const { 81 82 } 83 84 85 }} // namespace ariba, overlay 120 Timer::stop(); 121 for (size_t i = 0; i < table->size(); i++) { 122 route_item* it = (*table)[i]; 123 ChordMessage msg(ChordMessage::leave, nodeid, it->id); 124 send(&msg,it->info); 125 } 126 } 127 128 const EndpointDescriptor& Chord::resolveNode(const NodeID& node) { 129 const route_item* item = table->get(node); 130 if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED; 131 return baseoverlay.getEndpointDescriptor(item->info); 132 } 133 134 void Chord::routeMessage(const NodeID& destnode, Message* msg) { 135 // get next hop 136 const route_item* item = table->get_next_hop(destnode); 137 138 // message for this node? yes-> delegate to base overlay 139 if (item->id == nodeid) baseoverlay.incomingRouteMessage(msg); 140 else { // no-> send to next hop 141 ChordMessage cmsg(ChordMessage::route, nodeid, destnode); 142 cmsg.encapsulate(msg); 143 send(&cmsg, item->info); 144 } 145 } 146 147 OverlayInterface::NodeList Chord::getKnownNodes() const { 148 OverlayInterface::NodeList nodelist; 149 for (size_t i = 0; i < table->size(); i++) 150 if ((*table)[i]->ref_count != 0) nodelist.push_back((*table)[i]->id); 151 return nodelist; 152 } 153 154 /// @see CommunicationListener.h 155 /// @see OverlayInterface.h 156 void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) { 157 logging_debug("link_up: link=" << lnk.toString() << " remote=" << 158 remote.toString() ); 159 route_item* item = table->insert(remote); 160 161 if (!bootstrapLink.isUnspecified() && lnk == bootstrapLink) { 162 send_discovery_to(nodeid); 163 bootstrapLink = LinkID::UNSPECIFIED; 164 } 165 166 // item added to routing table? 167 if (item != NULL) { // yes-> add to routing table 168 logging_debug("new routing neighbor: " << remote.toString() ); 169 item->info = lnk; 170 } else { // no-> add orphan entry to routing table 171 logging_debug("new orphan: " << remote.toString() ); 172 table->insert_orphan(remote)->info = lnk; 173 } 174 } 175 176 /// @see CommunicationListener.h or @see OverlayInterface.h 177 void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { 178 logging_debug("link_down: link=" << lnk.toString() << " remote=" << 179 remote.toString() ); 180 181 // remove link from routing table 182 table->remove(remote); 183 } 184 185 /// @see CommunicationListener.h 186 /// @see OverlayInterface.h 187 void Chord::onMessage(const DataMessage& msg, const NodeID& remote, 188 const LinkID& link) { 189 190 // decode message 191 typedef ChordMessage M; 192 M* m = msg.getMessage()->convert<ChordMessage> (); 193 if (m == NULL) return; 194 195 logging_debug("onMessage: type=" << (int)m->getType() ); 196 197 // handle messages 198 switch (m->getType()) { 199 200 // invalid message 201 case M::invalid: 202 break; 203 204 // route message with payload 205 case M::route: { 206 // find next hop 207 const route_item* item = table->get_next_hop(m->getDestination()); 208 209 // next hop == myself? 210 if (m->getDestination() == nodeid) { // yes-> route to base overlay 211 logging_debug("send message to baseoverlay"); 212 baseoverlay.incomingRouteMessage(m); 213 } 214 // no-> route to next hop 215 else { 216 logging_debug("route chord message to " << item->id.toString() ); 217 send(m, item->info); 218 } 219 break; 220 } 221 222 // discovery request 223 case M::discovery: { 224 // decapsulate message 225 Discovery* dmsg = m->decapsulate<Discovery> (); 226 logging_debug("received discovery message with" 227 << " dest=" << m->getDestination().toString() 228 << " ttl=" << (int)dmsg->getTTL() 229 << " type=" << (int)dmsg->getFollowType() 230 ); 231 232 // check if source node can be added to routing table and setup link 233 if (m->getSource() != nodeid && table->is_insertable(m->getSource())) setup( 234 *dmsg->getSourceEndpoint()); 235 236 // delegate discovery message 237 switch (dmsg->getFollowType()) { 238 239 // normal: route discovery message like every other message 240 case Discovery::normal: 241 // closest node? yes-> split to follow successor and predecessor 242 if (table->is_closest_to(m->getDestination())) { 243 244 if (table->get_successor() != NULL) { 245 // send successor message 246 ChordMessage cmsg_s(*m); 247 Discovery dmsg_s(*dmsg); 248 dmsg_s.setFollowType(Discovery::successor); 249 cmsg_s.encapsulate(&dmsg_s); 250 route_item* succ_item = table->get(*table->get_successor()); 251 logging_debug("split: routing discovery message to successor " 252 << succ_item->id.toString() ); 253 send(&cmsg_s, succ_item->info); 254 } 255 256 // send predecessor message 257 if (table->get_predesessor() != NULL) { 258 ChordMessage cmsg_p(*m); 259 Discovery dmsg_p(*dmsg); 260 dmsg_p.setFollowType(Discovery::predecessor); 261 cmsg_p.encapsulate(&dmsg_p); 262 route_item* pred_item = table->get( 263 *table->get_predesessor()); 264 logging_debug("split: routing discovery message to predecessor " 265 << pred_item->id.toString() ); 266 send(&cmsg_p, pred_item->info); 267 } 268 } 269 // no-> route message 270 else { 271 // find next hop 272 const route_item* item = table->get_next_hop( 273 m->getDestination()); 274 if (item->id == nodeid) break; 275 logging_debug("routing discovery message to " << 276 item->id.toString() ); 277 send(m, item->info); 278 } 279 break; 280 281 // successor mode: follow the successor until TTL is zero 282 case Discovery::successor: 283 case Discovery::predecessor: { 284 // time to live ended? yes-> stop routing 285 if (dmsg->getTTL() == 0) break; 286 287 // decrease time-to-live 288 dmsg->setTTL(dmsg->getTTL() - 1); 289 290 const route_item* item; 291 if (dmsg->getFollowType() == Discovery::successor) item 292 = table->get(*table->get_successor()); 293 else item = table->get(*table->get_predesessor()); 294 logging_debug("routing discovery message to succ/pred " 295 << item->id.toString() ); 296 ChordMessage cmsg(*m); 297 cmsg.encapsulate(dmsg); 298 send(&cmsg, item->info); 299 break; 300 } 301 } 302 break; 303 } 304 305 // leave 306 case M::leave: { 307 baseoverlay.dropLink(link); 308 break; 309 } 310 } 311 } 312 313 void Chord::eventFunction() { 314 stabilize_counter++; 315 if (stabilize_counter == 3) { 316 stabilize_counter = 0; 317 stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() ); 318 logging_debug("sending discovery message to my neighbors"); 319 send_discovery_to(nodeid); 320 send_discovery_to( 321 table->get_finger_table(stabilize_finger).get_compare().get_center() 322 ); 323 orphan_removal_counter++; 324 if (orphan_removal_counter == 2) { 325 orphan_removal_counter = 0; 326 for (size_t i = 0; i < table->size(); i++) { 327 route_item* it = (*table)[i]; 328 if (it->ref_count == 0) { 329 baseoverlay.dropLink(it->info); 330 logging_debug("dropping orphaned link " << it->info.toString() << " to " << it->id.toString()); 331 } 332 } 333 } 334 } 335 logging_debug("--- chord routing information ----------------------------------"); 336 logging_debug("predecessor: " << (table->get_predesessor()==NULL? "<none>" : table->get_predesessor()->toString()) ); 337 logging_debug("node_id : " << nodeid.toString() ); 338 logging_debug("successor : " << (table->get_successor()==NULL? "<none>" : table->get_successor()->toString())); 339 logging_debug("----------------------------------------------------------------"); 340 } 341 342 } 343 } // namespace ariba, overlay -
source/ariba/overlay/modules/chord/Chord.h
r3689 r3690 1 // [Licen ce]1 // [License] 2 2 // The Ariba-Underlay Copyright 3 3 // … … 35 35 // official policies, either expressed or implied, of the Institute of 36 36 // Telematics. 37 // [Licen ce]37 // [License] 38 38 39 39 #ifndef CHORD_H_ 40 40 #define CHORD_H_ 41 41 42 #include "ariba/utility/system/Timer.h" 42 43 #include "ariba/utility/logging/Logging.h" 43 44 #include "ariba/communication/EndpointDescriptor.h" 45 #include "../OverlayInterface.h" 46 #include <vector> 47 48 class chord_routing_table; 44 49 45 50 namespace ariba { 46 51 namespace overlay { 47 52 48 using ariba::overlay::OverlayInterface;49 53 using ariba::communication::EndpointDescriptor; 54 using ariba::utility::Timer; 50 55 51 class Chord : public OverlayInterface { 56 using namespace std; 57 58 /** 59 * This class implements a structured overlay inspired by chord. 60 * It differs to the original form of chord in the following manner: 61 * 62 * (1) The graph is bidirectional 63 * (2) Stabilization is done in a reactive manner 64 * 65 * It therefore can be considered as a kind of Chorded-Kademlia :) 66 * 67 * The resulting overlay graph has a diameter of O(log N). 68 * 69 * @author Sebastian Mies <mies@tm.uka.de> 70 */ 71 class Chord : public OverlayInterface, protected Timer { 52 72 use_logging_h( Chord ); 53 73 private: 74 chord_routing_table* table; 75 int orphan_removal_counter; 76 int stabilize_counter; 77 int stabilize_finger; 78 LinkID bootstrapLink; 79 80 // helper: sets up a link using the "base overlay" 81 LinkID setup( const EndpointDescriptor& endp ); 82 83 // helper: sends a message using the "base overlay" 84 seqnum_t send( Message* msg, const LinkID& link ); 85 86 // stabilization: sends a discovery message to the specified neighborhood 87 void send_discovery_to( const NodeID& destination, int ttl = 4 ); 54 88 55 89 public: 56 90 Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, 57 91 OverlayStructureEvents* _eventsReceiver); 58 ~Chord();92 virtual ~Chord(); 59 93 60 void createOverlay(); 94 /// @see OverlayInterface.h 95 virtual void createOverlay(); 61 96 62 void deleteOverlay(); 97 /// @see OverlayInterface.h 98 virtual void deleteOverlay(); 63 99 64 void joinOverlay( 65 const EndpointDescriptor& bootstrapEp = EndpointDescriptor::UNSPECIFIED 100 /// @see OverlayInterface.h 101 virtual void joinOverlay( 102 const EndpointDescriptor& boot = EndpointDescriptor::UNSPECIFIED 66 103 ); 67 104 68 void leaveOverlay(); 105 /// @see OverlayInterface.h 106 virtual void leaveOverlay(); 69 107 70 const EndpointDescriptor& resolveNode( const NodeID& node ); 108 /// @see OverlayInterface.h 109 virtual const EndpointDescriptor& resolveNode( const NodeID& node ); 71 110 72 void routeMessage( const NodeID& destnode, Message* msg ); 111 /// @see OverlayInterface.h 112 virtual void routeMessage( const NodeID& destnode, Message* msg ); 73 113 74 NodeList getKnownNodes() const; 114 /// @see OverlayInterface.h 115 virtual NodeList getKnownNodes() const; 116 117 /// @see CommunicationListener.h or @see OverlayInterface.h 118 virtual void onLinkUp(const LinkID& lnk, const NodeID& remote); 119 120 /// @see CommunicationListener.h or @see OverlayInterface.h 121 virtual void onLinkDown(const LinkID& lnk, const NodeID& remote); 122 123 /// @see CommunicationListener.h or @see OverlayInterface.h 124 virtual void onMessage(const DataMessage& msg, const NodeID& remote, 125 const LinkID& lnk = LinkID::UNSPECIFIED); 126 127 /// @see Timer.h 128 virtual void eventFunction(); 75 129 }; 76 130
Note:
See TracChangeset
for help on using the changeset viewer.