1 | // [License] |
---|
2 | // The Ariba-Underlay Copyright |
---|
3 | // |
---|
4 | // Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH) |
---|
5 | // |
---|
6 | // Institute of Telematics |
---|
7 | // UniversitÀt Karlsruhe (TH) |
---|
8 | // Zirkel 2, 76128 Karlsruhe |
---|
9 | // Germany |
---|
10 | // |
---|
11 | // Redistribution and use in source and binary forms, with or without |
---|
12 | // modification, are permitted provided that the following conditions are |
---|
13 | // met: |
---|
14 | // |
---|
15 | // 1. Redistributions of source code must retain the above copyright |
---|
16 | // notice, this list of conditions and the following disclaimer. |
---|
17 | // 2. Redistributions in binary form must reproduce the above copyright |
---|
18 | // notice, this list of conditions and the following disclaimer in the |
---|
19 | // documentation and/or other materials provided with the distribution. |
---|
20 | // |
---|
21 | // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND |
---|
22 | // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
---|
23 | // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
---|
24 | // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR |
---|
25 | // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
---|
26 | // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
---|
27 | // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
---|
28 | // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
---|
29 | // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
---|
30 | // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
---|
31 | // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
32 | // |
---|
33 | // The views and conclusions contained in the software and documentation |
---|
34 | // are those of the authors and should not be interpreted as representing |
---|
35 | // official policies, either expressed or implied, of the Institute of |
---|
36 | // Telematics. |
---|
37 | // [License] |
---|
38 | |
---|
39 | #include "ariba/overlay/BaseOverlay.h" |
---|
40 | #include "ariba/overlay/messages/OverlayMsg.h" |
---|
41 | |
---|
42 | #include "Chord.h" |
---|
43 | #include "detail/chord_routing_table.hpp" |
---|
44 | |
---|
45 | //#include "messages/Discovery.h" // XXX DEPRECATED |
---|
46 | |
---|
47 | namespace ariba { |
---|
48 | namespace overlay { |
---|
49 | |
---|
50 | enum signalMessageTypes { |
---|
51 | typeDiscovery = OverlayMsg::typeSignalingStart + 0x01, |
---|
52 | typeLeave = OverlayMsg::typeSignalingStart + 0x02, |
---|
53 | }; |
---|
54 | |
---|
55 | typedef chord_routing_table::item route_item; |
---|
56 | |
---|
57 | using ariba::transport::system_priority; |
---|
58 | |
---|
59 | use_logging_cpp( Chord ); |
---|
60 | |
---|
61 | |
---|
62 | ////// Messages |
---|
63 | struct DiscoveryMessage |
---|
64 | { |
---|
65 | /** |
---|
66 | * DiscoveryMessage |
---|
67 | * - type |
---|
68 | * - data |
---|
69 | * - Endpoint |
---|
70 | */ |
---|
71 | |
---|
72 | // type enum |
---|
73 | enum type_ { |
---|
74 | invalid = 0, |
---|
75 | normal = 1, |
---|
76 | successor = 2, |
---|
77 | predecessor = 3 |
---|
78 | }; |
---|
79 | |
---|
80 | |
---|
81 | // data |
---|
82 | uint8_t type; |
---|
83 | uint8_t ttl; |
---|
84 | EndpointDescriptor endpoint; |
---|
85 | |
---|
86 | // serialize |
---|
87 | reboost::message_t serialize() |
---|
88 | { |
---|
89 | // serialize endpoint |
---|
90 | reboost::message_t msg = endpoint.serialize(); |
---|
91 | |
---|
92 | // serialize type and ttl |
---|
93 | uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data(); |
---|
94 | buff1[0] = type; |
---|
95 | buff1[1] = ttl; |
---|
96 | |
---|
97 | return msg; |
---|
98 | } |
---|
99 | |
---|
100 | //deserialize |
---|
101 | reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff) |
---|
102 | { |
---|
103 | // deserialize type and ttl |
---|
104 | const uint8_t* bytes = buff.data(); |
---|
105 | type = bytes[0]; |
---|
106 | ttl = bytes[1]; |
---|
107 | |
---|
108 | // deserialize endpoint |
---|
109 | return endpoint.deserialize(buff(2*sizeof(uint8_t))); |
---|
110 | } |
---|
111 | }; |
---|
112 | |
---|
113 | |
---|
114 | Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, |
---|
115 | OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) : |
---|
116 | OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) { |
---|
117 | |
---|
118 | // create routing table |
---|
119 | this->table = new chord_routing_table(_nodeid, 4); |
---|
120 | orphan_removal_counter = 0; |
---|
121 | discovery_count = 0; |
---|
122 | stabilize_counter = 0; |
---|
123 | stabilize_finger = 0; |
---|
124 | } |
---|
125 | |
---|
126 | Chord::~Chord() { |
---|
127 | |
---|
128 | // delete routing table |
---|
129 | delete table; |
---|
130 | } |
---|
131 | |
---|
132 | /// helper: sets up a link using the base overlay |
---|
133 | LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) { |
---|
134 | |
---|
135 | // check if we already have a connection |
---|
136 | for (size_t i=0; i<table->size(); i++) |
---|
137 | if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified())) |
---|
138 | return LinkID::UNSPECIFIED; |
---|
139 | |
---|
140 | // check if we are already trying to establish a link |
---|
141 | for (size_t i=0; i<pending.size(); i++) |
---|
142 | if ( pending[i] == remote ) { |
---|
143 | logging_debug("Already trying to establish a link to node " |
---|
144 | << remote.toString() ); |
---|
145 | return LinkID::UNSPECIFIED; |
---|
146 | } |
---|
147 | |
---|
148 | // adding node to list of pending connections |
---|
149 | pending.push_back( remote ); |
---|
150 | |
---|
151 | logging_info("Request to setup link to " << endpoint.toString() ); |
---|
152 | |
---|
153 | // establish link via base overlay |
---|
154 | return baseoverlay.establishLink( endpoint, remote, |
---|
155 | OverlayInterface::OVERLAY_SERVICE_ID ); |
---|
156 | } |
---|
157 | |
---|
158 | /// helper: sends a message using the "base overlay" |
---|
159 | void Chord::send( OverlayMsg* msg, const LinkID& link ) { |
---|
160 | if (link.isUnspecified()) |
---|
161 | return; |
---|
162 | |
---|
163 | try |
---|
164 | { |
---|
165 | baseoverlay.send_link( msg, link, system_priority::OVERLAY ); |
---|
166 | } |
---|
167 | catch ( message_not_sent& e ) |
---|
168 | { |
---|
169 | logging_warn("Chord: Could not send message over link " << link |
---|
170 | << ": " << e.what()); |
---|
171 | } |
---|
172 | } |
---|
173 | |
---|
174 | void Chord::send_node( OverlayMsg* message, const NodeID& remote ) |
---|
175 | { |
---|
176 | try |
---|
177 | { |
---|
178 | baseoverlay.send( message, remote, system_priority::OVERLAY ); |
---|
179 | } |
---|
180 | catch ( message_not_sent& e ) |
---|
181 | { |
---|
182 | logging_warn("Chord: Could not send message to " << remote |
---|
183 | << ": " << e.what()); |
---|
184 | } |
---|
185 | } |
---|
186 | |
---|
187 | /// sends a discovery message |
---|
188 | void Chord::send_discovery_to(const NodeID& remote, int ttl) { |
---|
189 | LinkID link = getNextLinkId(remote); |
---|
190 | if ( remote == nodeid || link.isUnspecified()) return; |
---|
191 | if ( table->size() == 0 ) return; |
---|
192 | ttl = 2; |
---|
193 | |
---|
194 | OverlayMsg msg( typeDiscovery ); |
---|
195 | msg.setRegisterRelay(true); |
---|
196 | |
---|
197 | // create DiscoveryMessage |
---|
198 | DiscoveryMessage dmsg; |
---|
199 | dmsg.type = DiscoveryMessage::normal; |
---|
200 | dmsg.ttl = ttl; |
---|
201 | dmsg.endpoint = baseoverlay.getEndpointDescriptor(); |
---|
202 | |
---|
203 | msg.set_payload_message(dmsg.serialize()); |
---|
204 | |
---|
205 | // send to node |
---|
206 | try |
---|
207 | { |
---|
208 | baseoverlay.send_node( &msg, remote, system_priority::OVERLAY ); |
---|
209 | } |
---|
210 | catch ( message_not_sent& e ) |
---|
211 | { |
---|
212 | logging_warn("Chord: Could not send message to " << remote |
---|
213 | << ": " << e.what()); |
---|
214 | } |
---|
215 | } |
---|
216 | |
---|
217 | void Chord::discover_neighbors( const LinkID& link ) { |
---|
218 | uint8_t ttl = 1; |
---|
219 | |
---|
220 | // FIXME try-catch for the send operations |
---|
221 | |
---|
222 | // create DiscoveryMessage |
---|
223 | DiscoveryMessage dmsg; |
---|
224 | dmsg.ttl = ttl; |
---|
225 | dmsg.endpoint = baseoverlay.getEndpointDescriptor(); |
---|
226 | { |
---|
227 | // send predecessor discovery |
---|
228 | OverlayMsg msg( typeDiscovery ); |
---|
229 | msg.setRegisterRelay(true); |
---|
230 | |
---|
231 | // set type |
---|
232 | dmsg.type = DiscoveryMessage::predecessor; |
---|
233 | |
---|
234 | // send |
---|
235 | msg.set_payload_message(dmsg.serialize()); |
---|
236 | send(&msg, link); |
---|
237 | } |
---|
238 | { |
---|
239 | // send successor discovery |
---|
240 | OverlayMsg msg( typeDiscovery ); |
---|
241 | // msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); // XXX this was redundand, wasn't it? |
---|
242 | msg.setRegisterRelay(true); |
---|
243 | |
---|
244 | // set type |
---|
245 | dmsg.type = DiscoveryMessage::successor; |
---|
246 | |
---|
247 | // send |
---|
248 | msg.set_payload_message(dmsg.serialize()); |
---|
249 | send(&msg, link); |
---|
250 | } |
---|
251 | } |
---|
252 | |
---|
253 | |
---|
254 | void Chord::createOverlay() { |
---|
255 | } |
---|
256 | |
---|
257 | void Chord::deleteOverlay() { |
---|
258 | |
---|
259 | } |
---|
260 | |
---|
261 | void Chord::joinOverlay(const EndpointDescriptor& boot) { |
---|
262 | logging_info( "joining Chord overlay structure through end-point " << |
---|
263 | (boot.isUnspecified() ? "local" : boot.toString()) ); |
---|
264 | |
---|
265 | // initiator? no->setup first link |
---|
266 | if (!boot.isUnspecified()) |
---|
267 | bootstrapLinks.push_back( setup(boot) ); |
---|
268 | |
---|
269 | // timer for stabilization management |
---|
270 | // Timer::setInterval(1000); // TODO find an appropriate interval! |
---|
271 | Timer::setInterval(10000); // XXX testing... |
---|
272 | Timer::start(); |
---|
273 | } |
---|
274 | |
---|
275 | void Chord::leaveOverlay() { |
---|
276 | Timer::stop(); |
---|
277 | for (size_t i = 0; i < table->size(); i++) { |
---|
278 | route_item* it = (*table)[i]; |
---|
279 | OverlayMsg msg( typeLeave ); |
---|
280 | send( &msg, it->info ); |
---|
281 | } |
---|
282 | } |
---|
283 | |
---|
284 | /// @see OverlayInterface.h |
---|
285 | const EndpointDescriptor& Chord::resolveNode(const NodeID& node) { |
---|
286 | const route_item* item = table->get(node); |
---|
287 | if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); |
---|
288 | return baseoverlay.getEndpointDescriptor(item->info); |
---|
289 | } |
---|
290 | |
---|
291 | /// @see OverlayInterface.h |
---|
292 | bool Chord::isClosestNodeTo( const NodeID& node ) { |
---|
293 | return table->is_closest_to(node); |
---|
294 | } |
---|
295 | |
---|
296 | /// @see OverlayInterface.h |
---|
297 | const LinkID& Chord::getNextLinkId( const NodeID& id ) const { |
---|
298 | // get next hop |
---|
299 | const route_item* item = table->get_next_hop(id); |
---|
300 | |
---|
301 | // returns a unspecified id when this is itself |
---|
302 | if (item == NULL || item->id == nodeid) |
---|
303 | return LinkID::UNSPECIFIED; |
---|
304 | |
---|
305 | /// return routing info |
---|
306 | return item->info; |
---|
307 | } |
---|
308 | |
---|
309 | std::vector<const LinkID*> Chord::getSortedLinkIdsTowardsNode( |
---|
310 | const NodeID& id, int num ) const |
---|
311 | { |
---|
312 | std::vector<const LinkID*> ret; |
---|
313 | |
---|
314 | switch ( num ) |
---|
315 | { |
---|
316 | // special case: just call »getNextLinkId« |
---|
317 | case 1: |
---|
318 | { |
---|
319 | ret.push_back(&getNextLinkId(id)); |
---|
320 | |
---|
321 | break; |
---|
322 | } |
---|
323 | |
---|
324 | // * calculate top 2 * |
---|
325 | case 0: |
---|
326 | case 2: |
---|
327 | { |
---|
328 | std::vector<const route_item*> items = table->get_next_2_hops(id); |
---|
329 | |
---|
330 | ret.reserve(items.size()); |
---|
331 | |
---|
332 | BOOST_FOREACH( const route_item* item, items ) |
---|
333 | { |
---|
334 | ret.push_back(&item->info); |
---|
335 | } |
---|
336 | |
---|
337 | break; |
---|
338 | } |
---|
339 | |
---|
340 | // NOTE: implement real sorting, if needed (and handle "case 0" properly, then) |
---|
341 | default: |
---|
342 | { |
---|
343 | throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)"); |
---|
344 | |
---|
345 | break; |
---|
346 | } |
---|
347 | } |
---|
348 | |
---|
349 | return ret; |
---|
350 | } |
---|
351 | |
---|
352 | |
---|
353 | /// @see OverlayInterface.h |
---|
354 | const NodeID& Chord::getNextNodeId( const NodeID& id ) const { |
---|
355 | // get next hop |
---|
356 | const route_item* item = table->get_next_hop(id); |
---|
357 | |
---|
358 | // return unspecified if no next hop could be found |
---|
359 | if (item == NULL) { |
---|
360 | return NodeID::UNSPECIFIED; |
---|
361 | } |
---|
362 | |
---|
363 | return item->id; |
---|
364 | } |
---|
365 | |
---|
366 | OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const { |
---|
367 | OverlayInterface::NodeList nodelist; |
---|
368 | |
---|
369 | if( deep ){ |
---|
370 | // all nodes that I know, fingers, succ/pred |
---|
371 | for (size_t i = 0; i < table->size(); i++){ |
---|
372 | if ((*table)[i]->ref_count != 0 |
---|
373 | && !(*table)[i]->info.isUnspecified()) |
---|
374 | nodelist.push_back((*table)[i]->id); |
---|
375 | } |
---|
376 | } else { |
---|
377 | // only succ and pred |
---|
378 | if( table->get_predesessor() != NULL ){ |
---|
379 | nodelist.push_back( *(table->get_predesessor()) ); |
---|
380 | } |
---|
381 | if( table->get_successor() != NULL ){ |
---|
382 | OverlayInterface::NodeList::iterator i = |
---|
383 | std::find( nodelist.begin(), nodelist.end(), *(table->get_successor()) ); |
---|
384 | if( i == nodelist.end() ) |
---|
385 | nodelist.push_back( *(table->get_successor()) ); |
---|
386 | } |
---|
387 | } |
---|
388 | |
---|
389 | return nodelist; |
---|
390 | } |
---|
391 | |
---|
392 | /// @see CommunicationListener.h |
---|
393 | /// @see OverlayInterface.h |
---|
394 | void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) { |
---|
395 | logging_info("link_up: link=" << lnk.toString() << " remote=" << |
---|
396 | remote.toString() ); |
---|
397 | for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++) |
---|
398 | if (*i == remote) { |
---|
399 | pending.erase(i); |
---|
400 | break; |
---|
401 | } |
---|
402 | |
---|
403 | if (remote==nodeid) { |
---|
404 | logging_warn("dropping link that has been established to myself (nodes have same nodeid?)"); |
---|
405 | logging_warn("NodeID: " << remote); |
---|
406 | baseoverlay.dropLink(lnk); |
---|
407 | return; |
---|
408 | } |
---|
409 | |
---|
410 | route_item* item = table->insert(remote); |
---|
411 | |
---|
412 | // item added to routing table? |
---|
413 | if (item != NULL) { // yes-> add to routing table |
---|
414 | logging_info("new routing neighbor: " << remote.toString() |
---|
415 | << " with link " << lnk.toString()); |
---|
416 | |
---|
417 | // replace with new link if link is "better" |
---|
418 | if (item->info!=lnk && item->info.isUnspecified()==false) { |
---|
419 | if (baseoverlay.compare( item->info, lnk ) == 1) { |
---|
420 | logging_info("Replacing link due to concurrent link establishment."); |
---|
421 | baseoverlay.dropLink(item->info); |
---|
422 | item->info = lnk; |
---|
423 | } |
---|
424 | } else { |
---|
425 | item->info = lnk; |
---|
426 | } |
---|
427 | |
---|
428 | // discover neighbors of new overlay neighbor |
---|
429 | showLinks(); |
---|
430 | } else { // no-> add orphan entry to routing table |
---|
431 | logging_info("new orphan: " << remote.toString() |
---|
432 | << " with link " << lnk.toString()); |
---|
433 | table->insert_orphan(remote)->info = lnk; |
---|
434 | } |
---|
435 | |
---|
436 | // erase bootstrap link |
---|
437 | vector<LinkID>::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk); |
---|
438 | if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); |
---|
439 | } |
---|
440 | |
---|
441 | /// @see CommunicationListener.h or @see OverlayInterface.h |
---|
442 | void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { |
---|
443 | // XXX logging_debug |
---|
444 | logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" << |
---|
445 | remote.toString() ); |
---|
446 | |
---|
447 | // remove link from routing table |
---|
448 | route_item* item = table->get(remote); |
---|
449 | if (item!=NULL && item->info==lnk) { |
---|
450 | item->info = LinkID::UNSPECIFIED; |
---|
451 | table->remove(remote); |
---|
452 | } |
---|
453 | } |
---|
454 | |
---|
455 | /// @see CommunicationListener.h |
---|
456 | /// @see OverlayInterface.h |
---|
457 | void Chord::onMessage(OverlayMsg* msg, |
---|
458 | reboost::shared_buffer_t sub_msg, |
---|
459 | const NodeID& remote, |
---|
460 | const LinkID& link) { |
---|
461 | |
---|
462 | // handle messages |
---|
463 | switch ((signalMessageTypes) msg->getType()) { |
---|
464 | |
---|
465 | // discovery request |
---|
466 | case typeDiscovery: |
---|
467 | { |
---|
468 | // deserialize discovery message |
---|
469 | DiscoveryMessage dmsg; |
---|
470 | dmsg.deserialize(sub_msg); |
---|
471 | |
---|
472 | logging_debug("Received discovery message with" |
---|
473 | << " src=" << msg->getSourceNode().toString() |
---|
474 | << " dst=" << msg->getDestinationNode().toString() |
---|
475 | << " ttl=" << (int)dmsg.ttl |
---|
476 | << " type=" << (int)dmsg.type |
---|
477 | ); |
---|
478 | |
---|
479 | // add discovery node id |
---|
480 | bool found = false; |
---|
481 | BOOST_FOREACH( NodeID& value, discovery ) |
---|
482 | if (value == msg->getSourceNode()) { |
---|
483 | found = true; |
---|
484 | break; |
---|
485 | } |
---|
486 | if (!found) discovery.push_back(msg->getSourceNode()); |
---|
487 | |
---|
488 | // check if source node can be added to routing table and setup link |
---|
489 | if (msg->getSourceNode() != nodeid) |
---|
490 | setup( dmsg.endpoint, msg->getSourceNode() ); |
---|
491 | |
---|
492 | // process discovery message -------------------------- switch start -- |
---|
493 | switch ( dmsg.type ) |
---|
494 | { |
---|
495 | // normal: route discovery message like every other message |
---|
496 | case DiscoveryMessage::normal: |
---|
497 | { |
---|
498 | // closest node? yes-> split to follow successor and predecessor |
---|
499 | if ( table->is_closest_to(msg->getDestinationNode()) ) |
---|
500 | { |
---|
501 | logging_debug("Discovery split:"); |
---|
502 | if (!table->get_successor()->isUnspecified()) |
---|
503 | { |
---|
504 | OverlayMsg omsg(*msg); |
---|
505 | |
---|
506 | dmsg.type = DiscoveryMessage::successor; |
---|
507 | omsg.set_payload_message(dmsg.serialize()); |
---|
508 | |
---|
509 | logging_debug("* Routing to successor " |
---|
510 | << table->get_successor()->toString() ); |
---|
511 | send_node( &omsg, *table->get_successor() ); |
---|
512 | } |
---|
513 | |
---|
514 | // send predecessor message |
---|
515 | if (!table->get_predesessor()->isUnspecified()) |
---|
516 | { |
---|
517 | OverlayMsg omsg(*msg); |
---|
518 | |
---|
519 | dmsg.type = DiscoveryMessage::predecessor; |
---|
520 | omsg.set_payload_message(dmsg.serialize()); |
---|
521 | |
---|
522 | logging_debug("* Routing to predecessor " |
---|
523 | << table->get_predesessor()->toString() ); |
---|
524 | send_node( &omsg, *table->get_predesessor() ); |
---|
525 | } |
---|
526 | } |
---|
527 | // no-> route message |
---|
528 | else |
---|
529 | { |
---|
530 | baseoverlay.route( msg ); |
---|
531 | } |
---|
532 | break; |
---|
533 | } |
---|
534 | |
---|
535 | // successor mode: follow the successor until TTL is zero |
---|
536 | case DiscoveryMessage::successor: |
---|
537 | case DiscoveryMessage::predecessor: |
---|
538 | { |
---|
539 | // reached destination? no->forward! |
---|
540 | if (msg->getDestinationNode() != nodeid) |
---|
541 | { |
---|
542 | OverlayMsg omsg(*msg); |
---|
543 | omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); |
---|
544 | |
---|
545 | omsg.set_payload_message(dmsg.serialize()); |
---|
546 | |
---|
547 | baseoverlay.route( &omsg ); |
---|
548 | break; |
---|
549 | } |
---|
550 | |
---|
551 | // time to live ended? yes-> stop routing |
---|
552 | if (dmsg.ttl == 0 || dmsg.ttl > 10) break; |
---|
553 | |
---|
554 | // decrease time-to-live |
---|
555 | dmsg.ttl--; |
---|
556 | |
---|
557 | const route_item* item = NULL; |
---|
558 | if (dmsg.type == DiscoveryMessage::successor && |
---|
559 | table->get_successor() != NULL) |
---|
560 | { |
---|
561 | item = table->get(*table->get_successor()); |
---|
562 | } |
---|
563 | else if (table->get_predesessor() != NULL) |
---|
564 | { |
---|
565 | item = table->get(*table->get_predesessor()); |
---|
566 | } |
---|
567 | if (item == NULL) |
---|
568 | break; |
---|
569 | |
---|
570 | logging_debug("Routing discovery message to succ/pred " |
---|
571 | << item->id.toString() ); |
---|
572 | OverlayMsg omsg(*msg); |
---|
573 | omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); |
---|
574 | omsg.setDestinationNode(item->id); |
---|
575 | |
---|
576 | omsg.set_payload_message(dmsg.serialize()); |
---|
577 | |
---|
578 | send_node( &omsg, omsg.getDestinationNode() ); |
---|
579 | break; |
---|
580 | } |
---|
581 | case DiscoveryMessage::invalid: |
---|
582 | break; |
---|
583 | |
---|
584 | default: |
---|
585 | break; |
---|
586 | } |
---|
587 | // process discovery message ---------------------------- switch end -- |
---|
588 | |
---|
589 | break; |
---|
590 | } |
---|
591 | |
---|
592 | // leave |
---|
593 | case typeLeave: { |
---|
594 | if (link!=LinkID::UNSPECIFIED) { |
---|
595 | route_item* item = table->get(remote); |
---|
596 | if (item!=NULL) item->info = LinkID::UNSPECIFIED; |
---|
597 | table->remove(remote); |
---|
598 | baseoverlay.dropLink(link); |
---|
599 | } |
---|
600 | break; |
---|
601 | } |
---|
602 | } |
---|
603 | } |
---|
604 | |
---|
605 | void Chord::eventFunction() { |
---|
606 | stabilize_counter++; |
---|
607 | if (stabilize_counter < 0 || stabilize_counter == 2) { |
---|
608 | |
---|
609 | // reset counter |
---|
610 | stabilize_counter = 0; |
---|
611 | |
---|
612 | // clear pending connections |
---|
613 | pending.clear(); |
---|
614 | |
---|
615 | // get number of real neighbors |
---|
616 | size_t numNeighbors = 0; |
---|
617 | for (size_t i = 0; i < table->size(); i++) { |
---|
618 | route_item* it = (*table)[i]; |
---|
619 | if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++; |
---|
620 | } |
---|
621 | logging_info("Running stabilization: #links=" |
---|
622 | << table->size() << " #neighbors=" << numNeighbors ); |
---|
623 | |
---|
624 | // updating neighbors |
---|
625 | logging_debug("Discover new ring neighbors"); |
---|
626 | for (size_t i=0; i<table->size(); i++) { |
---|
627 | LinkID id = (*table)[i]->info; |
---|
628 | if (!id.isUnspecified()) discover_neighbors(id); |
---|
629 | } |
---|
630 | |
---|
631 | // sending discovery |
---|
632 | logging_debug("Sending discovery message to my neighbors and fingers"); |
---|
633 | stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() ); |
---|
634 | const NodeID disc = table->get_finger_table(stabilize_finger).get_compare().get_center(); |
---|
635 | if (disc != nodeid) |
---|
636 | send_discovery_to(disc); |
---|
637 | |
---|
638 | // remove orphan links |
---|
639 | orphan_removal_counter++; |
---|
640 | if (orphan_removal_counter <0 || orphan_removal_counter >= 2) { |
---|
641 | logging_info("Discovered nodes: "); |
---|
642 | BOOST_FOREACH( NodeID& id, discovery ) |
---|
643 | logging_info("* " << id.toString()); |
---|
644 | discovery.clear(); |
---|
645 | logging_info("Running orphan removal"); |
---|
646 | orphan_removal_counter = 0; |
---|
647 | for (size_t i = 0; i < table->size(); i++) { |
---|
648 | route_item* it = (*table)[i]; |
---|
649 | if (it->ref_count == 0 && !it->info.isUnspecified()) { |
---|
650 | logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString()); |
---|
651 | table->insert(it->id); |
---|
652 | if (it->ref_count==0) { |
---|
653 | LinkID id = it->info; |
---|
654 | it->info = LinkID::UNSPECIFIED; |
---|
655 | baseoverlay.dropLink(id); |
---|
656 | } |
---|
657 | } |
---|
658 | } |
---|
659 | } |
---|
660 | } |
---|
661 | } |
---|
662 | |
---|
663 | void Chord::showLinks() { |
---|
664 | logging_info("--- chord routing information ----------------------------------"); |
---|
665 | logging_info("predecessor: " << (table->get_predesessor()==NULL? "<none>" : |
---|
666 | table->get_predesessor()->toString()) ); |
---|
667 | logging_info("node_id : " << nodeid.toString() ); |
---|
668 | logging_info("successor : " << (table->get_successor()==NULL? "<none>" : |
---|
669 | table->get_successor()->toString())); |
---|
670 | logging_info("----------------------------------------------------------------"); |
---|
671 | } |
---|
672 | |
---|
673 | /// @see OverlayInterface.h |
---|
674 | std::string Chord::debugInformation() const { |
---|
675 | std::ostringstream s; |
---|
676 | s << "protocol : Chord" << endl; |
---|
677 | s << "node_id : " << nodeid.toString() << endl; |
---|
678 | s << "predecessor: " << (table->get_predesessor()==NULL? "<none>" : |
---|
679 | table->get_predesessor()->toString()) << endl; |
---|
680 | s << "successor : " << (table->get_successor()==NULL? "<none>" : |
---|
681 | table->get_successor()->toString()) << endl; |
---|
682 | s << "nodes: " << endl; |
---|
683 | for (size_t i = 0; i < table->size(); i++) { |
---|
684 | route_item* it = (*table)[i]; |
---|
685 | if (it->ref_count != 0 && !it->info.isUnspecified()) { |
---|
686 | s << it->id.toString().substr(0,6) |
---|
687 | << " using " << it->info.toString().substr(0,6) << endl; |
---|
688 | } |
---|
689 | } |
---|
690 | return s.str(); |
---|
691 | } |
---|
692 | |
---|
693 | |
---|
694 | |
---|
695 | }} // namespace ariba, overlay |
---|