close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/overlay/modules/chord/Chord.cpp@ 12775

Last change on this file since 12775 was 12775, checked in by hock@…, 11 years ago

Chord: catch message_not_sent exception and print a warning (instead of crashing)

File size: 20.2 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "ariba/overlay/BaseOverlay.h"
40#include "ariba/overlay/messages/OverlayMsg.h"
41
42#include "Chord.h"
43#include "detail/chord_routing_table.hpp"
44
45//#include "messages/Discovery.h" // XXX DEPRECATED
46
47namespace ariba {
48namespace overlay {
49
50enum signalMessageTypes {
51 typeDiscovery = OverlayMsg::typeSignalingStart + 0x01,
52 typeLeave = OverlayMsg::typeSignalingStart + 0x02,
53};
54
55typedef chord_routing_table::item route_item;
56
57using ariba::transport::system_priority;
58
59use_logging_cpp( Chord );
60
61
62////// Messages
63struct DiscoveryMessage
64{
65 /**
66 * DiscoveryMessage
67 * - type
68 * - data
69 * - Endpoint
70 */
71
72 // type enum
73 enum type_ {
74 invalid = 0,
75 normal = 1,
76 successor = 2,
77 predecessor = 3
78 };
79
80
81 // data
82 uint8_t type;
83 uint8_t ttl;
84 EndpointDescriptor endpoint;
85
86 // serialize
87 reboost::message_t serialize()
88 {
89 // serialize endpoint
90 reboost::message_t msg = endpoint.serialize();
91
92 // serialize type and ttl
93 uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data();
94 buff1[0] = type;
95 buff1[1] = ttl;
96
97 return msg;
98 }
99
100 //deserialize
101 reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff)
102 {
103 // deserialize type and ttl
104 const uint8_t* bytes = buff.data();
105 type = bytes[0];
106 ttl = bytes[1];
107
108 // deserialize endpoint
109 return endpoint.deserialize(buff(2*sizeof(uint8_t)));
110 }
111};
112
113
114Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid,
115 OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) :
116 OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) {
117
118 // create routing table
119 this->table = new chord_routing_table(_nodeid, 4);
120 orphan_removal_counter = 0;
121 discovery_count = 0;
122 stabilize_counter = 0;
123 stabilize_finger = 0;
124}
125
126Chord::~Chord() {
127
128 // delete routing table
129 delete table;
130}
131
132/// helper: sets up a link using the base overlay
133LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) {
134
135 // check if we already have a connection
136 for (size_t i=0; i<table->size(); i++)
137 if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()))
138 return LinkID::UNSPECIFIED;
139
140 // check if we are already trying to establish a link
141 for (size_t i=0; i<pending.size(); i++)
142 if ( pending[i] == remote ) {
143 logging_debug("Already trying to establish a link to node "
144 << remote.toString() );
145 return LinkID::UNSPECIFIED;
146 }
147
148 // adding node to list of pending connections
149 pending.push_back( remote );
150
151 logging_info("Request to setup link to " << endpoint.toString() );
152
153 // establish link via base overlay
154 return baseoverlay.establishLink( endpoint, remote,
155 OverlayInterface::OVERLAY_SERVICE_ID );
156}
157
158/// helper: sends a message using the "base overlay"
159void Chord::send( OverlayMsg* msg, const LinkID& link ) {
160 if (link.isUnspecified())
161 return;
162
163 try
164 {
165 baseoverlay.send_link( msg, link, system_priority::OVERLAY );
166 }
167 catch ( message_not_sent& e )
168 {
169 logging_warn("Chord: Could not send message over link " << link
170 << ": " << e.what());
171 }
172}
173
174void Chord::send_node( OverlayMsg* message, const NodeID& remote )
175{
176 try
177 {
178 baseoverlay.send( message, remote, system_priority::OVERLAY );
179 }
180 catch ( message_not_sent& e )
181 {
182 logging_warn("Chord: Could not send message to " << remote
183 << ": " << e.what());
184 }
185}
186
187/// sends a discovery message
188void Chord::send_discovery_to(const NodeID& remote, int ttl) {
189 LinkID link = getNextLinkId(remote);
190 if ( remote == nodeid || link.isUnspecified()) return;
191 if ( table->size() == 0 ) return;
192 ttl = 2;
193
194 OverlayMsg msg( typeDiscovery );
195 msg.setRegisterRelay(true);
196
197 // create DiscoveryMessage
198 DiscoveryMessage dmsg;
199 dmsg.type = DiscoveryMessage::normal;
200 dmsg.ttl = ttl;
201 dmsg.endpoint = baseoverlay.getEndpointDescriptor();
202
203 msg.set_payload_message(dmsg.serialize());
204
205 // send to node
206 try
207 {
208 baseoverlay.send_node( &msg, remote, system_priority::OVERLAY );
209 }
210 catch ( message_not_sent& e )
211 {
212 logging_warn("Chord: Could not send message to " << remote
213 << ": " << e.what());
214 }
215}
216
217void Chord::discover_neighbors( const LinkID& link ) {
218 uint8_t ttl = 1;
219
220 // FIXME try-catch for the send operations
221
222 // create DiscoveryMessage
223 DiscoveryMessage dmsg;
224 dmsg.ttl = ttl;
225 dmsg.endpoint = baseoverlay.getEndpointDescriptor();
226 {
227 // send predecessor discovery
228 OverlayMsg msg( typeDiscovery );
229 msg.setRegisterRelay(true);
230
231 // set type
232 dmsg.type = DiscoveryMessage::predecessor;
233
234 // send
235 msg.set_payload_message(dmsg.serialize());
236 send(&msg, link);
237 }
238 {
239 // send successor discovery
240 OverlayMsg msg( typeDiscovery );
241// msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); // XXX this was redundand, wasn't it?
242 msg.setRegisterRelay(true);
243
244 // set type
245 dmsg.type = DiscoveryMessage::successor;
246
247 // send
248 msg.set_payload_message(dmsg.serialize());
249 send(&msg, link);
250 }
251}
252
253
254void Chord::createOverlay() {
255}
256
257void Chord::deleteOverlay() {
258
259}
260
261void Chord::joinOverlay(const EndpointDescriptor& boot) {
262 logging_info( "joining Chord overlay structure through end-point " <<
263 (boot.isUnspecified() ? "local" : boot.toString()) );
264
265 // initiator? no->setup first link
266 if (!boot.isUnspecified())
267 bootstrapLinks.push_back( setup(boot) );
268
269 // timer for stabilization management
270// Timer::setInterval(1000); // TODO find an appropriate interval!
271 Timer::setInterval(10000); // XXX testing...
272 Timer::start();
273}
274
275void Chord::leaveOverlay() {
276 Timer::stop();
277 for (size_t i = 0; i < table->size(); i++) {
278 route_item* it = (*table)[i];
279 OverlayMsg msg( typeLeave );
280 send( &msg, it->info );
281 }
282}
283
284/// @see OverlayInterface.h
285const EndpointDescriptor& Chord::resolveNode(const NodeID& node) {
286 const route_item* item = table->get(node);
287 if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
288 return baseoverlay.getEndpointDescriptor(item->info);
289}
290
291/// @see OverlayInterface.h
292bool Chord::isClosestNodeTo( const NodeID& node ) {
293 return table->is_closest_to(node);
294}
295
296/// @see OverlayInterface.h
297const LinkID& Chord::getNextLinkId( const NodeID& id ) const {
298 // get next hop
299 const route_item* item = table->get_next_hop(id);
300
301 // returns a unspecified id when this is itself
302 if (item == NULL || item->id == nodeid)
303 return LinkID::UNSPECIFIED;
304
305 /// return routing info
306 return item->info;
307}
308
309std::vector<const LinkID*> Chord::getSortedLinkIdsTowardsNode(
310 const NodeID& id, int num ) const
311{
312 std::vector<const LinkID*> ret;
313
314 switch ( num )
315 {
316 // special case: just call »getNextLinkId«
317 case 1:
318 {
319 ret.push_back(&getNextLinkId(id));
320
321 break;
322 }
323
324 // * calculate top 2 *
325 case 0:
326 case 2:
327 {
328 std::vector<const route_item*> items = table->get_next_2_hops(id);
329
330 ret.reserve(items.size());
331
332 BOOST_FOREACH( const route_item* item, items )
333 {
334 ret.push_back(&item->info);
335 }
336
337 break;
338 }
339
340 // NOTE: implement real sorting, if needed (and handle "case 0" properly, then)
341 default:
342 {
343 throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)");
344
345 break;
346 }
347 }
348
349 return ret;
350}
351
352
353/// @see OverlayInterface.h
354const NodeID& Chord::getNextNodeId( const NodeID& id ) const {
355 // get next hop
356 const route_item* item = table->get_next_hop(id);
357
358 // return unspecified if no next hop could be found
359 if (item == NULL) {
360 return NodeID::UNSPECIFIED;
361 }
362
363 return item->id;
364}
365
366OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const {
367 OverlayInterface::NodeList nodelist;
368
369 if( deep ){
370 // all nodes that I know, fingers, succ/pred
371 for (size_t i = 0; i < table->size(); i++){
372 if ((*table)[i]->ref_count != 0
373 && !(*table)[i]->info.isUnspecified())
374 nodelist.push_back((*table)[i]->id);
375 }
376 } else {
377 // only succ and pred
378 if( table->get_predesessor() != NULL ){
379 nodelist.push_back( *(table->get_predesessor()) );
380 }
381 if( table->get_successor() != NULL ){
382 OverlayInterface::NodeList::iterator i =
383 std::find( nodelist.begin(), nodelist.end(), *(table->get_successor()) );
384 if( i == nodelist.end() )
385 nodelist.push_back( *(table->get_successor()) );
386 }
387 }
388
389 return nodelist;
390}
391
392/// @see CommunicationListener.h
393/// @see OverlayInterface.h
394void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) {
395 logging_info("link_up: link=" << lnk.toString() << " remote=" <<
396 remote.toString() );
397 for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++)
398 if (*i == remote) {
399 pending.erase(i);
400 break;
401 }
402
403 if (remote==nodeid) {
404 logging_warn("dropping link that has been established to myself (nodes have same nodeid?)");
405 logging_warn("NodeID: " << remote);
406 baseoverlay.dropLink(lnk);
407 return;
408 }
409
410 route_item* item = table->insert(remote);
411
412 // item added to routing table?
413 if (item != NULL) { // yes-> add to routing table
414 logging_info("new routing neighbor: " << remote.toString()
415 << " with link " << lnk.toString());
416
417 // replace with new link if link is "better"
418 if (item->info!=lnk && item->info.isUnspecified()==false) {
419 if (baseoverlay.compare( item->info, lnk ) == 1) {
420 logging_info("Replacing link due to concurrent link establishment.");
421 baseoverlay.dropLink(item->info);
422 item->info = lnk;
423 }
424 } else {
425 item->info = lnk;
426 }
427
428 // discover neighbors of new overlay neighbor
429 showLinks();
430 } else { // no-> add orphan entry to routing table
431 logging_info("new orphan: " << remote.toString()
432 << " with link " << lnk.toString());
433 table->insert_orphan(remote)->info = lnk;
434 }
435
436 // erase bootstrap link
437 vector<LinkID>::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk);
438 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
439}
440
441/// @see CommunicationListener.h or @see OverlayInterface.h
442void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) {
443 // XXX logging_debug
444 logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" <<
445 remote.toString() );
446
447 // remove link from routing table
448 route_item* item = table->get(remote);
449 if (item!=NULL && item->info==lnk) {
450 item->info = LinkID::UNSPECIFIED;
451 table->remove(remote);
452 }
453}
454
455/// @see CommunicationListener.h
456/// @see OverlayInterface.h
457void Chord::onMessage(OverlayMsg* msg,
458 reboost::shared_buffer_t sub_msg,
459 const NodeID& remote,
460 const LinkID& link) {
461
462 // handle messages
463 switch ((signalMessageTypes) msg->getType()) {
464
465 // discovery request
466 case typeDiscovery:
467 {
468 // deserialize discovery message
469 DiscoveryMessage dmsg;
470 dmsg.deserialize(sub_msg);
471
472 logging_debug("Received discovery message with"
473 << " src=" << msg->getSourceNode().toString()
474 << " dst=" << msg->getDestinationNode().toString()
475 << " ttl=" << (int)dmsg.ttl
476 << " type=" << (int)dmsg.type
477 );
478
479 // add discovery node id
480 bool found = false;
481 BOOST_FOREACH( NodeID& value, discovery )
482 if (value == msg->getSourceNode()) {
483 found = true;
484 break;
485 }
486 if (!found) discovery.push_back(msg->getSourceNode());
487
488 // check if source node can be added to routing table and setup link
489 if (msg->getSourceNode() != nodeid)
490 setup( dmsg.endpoint, msg->getSourceNode() );
491
492 // process discovery message -------------------------- switch start --
493 switch ( dmsg.type )
494 {
495 // normal: route discovery message like every other message
496 case DiscoveryMessage::normal:
497 {
498 // closest node? yes-> split to follow successor and predecessor
499 if ( table->is_closest_to(msg->getDestinationNode()) )
500 {
501 logging_debug("Discovery split:");
502 if (!table->get_successor()->isUnspecified())
503 {
504 OverlayMsg omsg(*msg);
505
506 dmsg.type = DiscoveryMessage::successor;
507 omsg.set_payload_message(dmsg.serialize());
508
509 logging_debug("* Routing to successor "
510 << table->get_successor()->toString() );
511 send_node( &omsg, *table->get_successor() );
512 }
513
514 // send predecessor message
515 if (!table->get_predesessor()->isUnspecified())
516 {
517 OverlayMsg omsg(*msg);
518
519 dmsg.type = DiscoveryMessage::predecessor;
520 omsg.set_payload_message(dmsg.serialize());
521
522 logging_debug("* Routing to predecessor "
523 << table->get_predesessor()->toString() );
524 send_node( &omsg, *table->get_predesessor() );
525 }
526 }
527 // no-> route message
528 else
529 {
530 baseoverlay.route( msg );
531 }
532 break;
533 }
534
535 // successor mode: follow the successor until TTL is zero
536 case DiscoveryMessage::successor:
537 case DiscoveryMessage::predecessor:
538 {
539 // reached destination? no->forward!
540 if (msg->getDestinationNode() != nodeid)
541 {
542 OverlayMsg omsg(*msg);
543 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
544
545 omsg.set_payload_message(dmsg.serialize());
546
547 baseoverlay.route( &omsg );
548 break;
549 }
550
551 // time to live ended? yes-> stop routing
552 if (dmsg.ttl == 0 || dmsg.ttl > 10) break;
553
554 // decrease time-to-live
555 dmsg.ttl--;
556
557 const route_item* item = NULL;
558 if (dmsg.type == DiscoveryMessage::successor &&
559 table->get_successor() != NULL)
560 {
561 item = table->get(*table->get_successor());
562 }
563 else if (table->get_predesessor() != NULL)
564 {
565 item = table->get(*table->get_predesessor());
566 }
567 if (item == NULL)
568 break;
569
570 logging_debug("Routing discovery message to succ/pred "
571 << item->id.toString() );
572 OverlayMsg omsg(*msg);
573 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
574 omsg.setDestinationNode(item->id);
575
576 omsg.set_payload_message(dmsg.serialize());
577
578 send_node( &omsg, omsg.getDestinationNode() );
579 break;
580 }
581 case DiscoveryMessage::invalid:
582 break;
583
584 default:
585 break;
586 }
587 // process discovery message ---------------------------- switch end --
588
589 break;
590 }
591
592 // leave
593 case typeLeave: {
594 if (link!=LinkID::UNSPECIFIED) {
595 route_item* item = table->get(remote);
596 if (item!=NULL) item->info = LinkID::UNSPECIFIED;
597 table->remove(remote);
598 baseoverlay.dropLink(link);
599 }
600 break;
601 }
602 }
603}
604
605void Chord::eventFunction() {
606 stabilize_counter++;
607 if (stabilize_counter < 0 || stabilize_counter == 2) {
608
609 // reset counter
610 stabilize_counter = 0;
611
612 // clear pending connections
613 pending.clear();
614
615 // get number of real neighbors
616 size_t numNeighbors = 0;
617 for (size_t i = 0; i < table->size(); i++) {
618 route_item* it = (*table)[i];
619 if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++;
620 }
621 logging_info("Running stabilization: #links="
622 << table->size() << " #neighbors=" << numNeighbors );
623
624 // updating neighbors
625 logging_debug("Discover new ring neighbors");
626 for (size_t i=0; i<table->size(); i++) {
627 LinkID id = (*table)[i]->info;
628 if (!id.isUnspecified()) discover_neighbors(id);
629 }
630
631 // sending discovery
632 logging_debug("Sending discovery message to my neighbors and fingers");
633 stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() );
634 const NodeID disc = table->get_finger_table(stabilize_finger).get_compare().get_center();
635 if (disc != nodeid)
636 send_discovery_to(disc);
637
638 // remove orphan links
639 orphan_removal_counter++;
640 if (orphan_removal_counter <0 || orphan_removal_counter >= 2) {
641 logging_info("Discovered nodes: ");
642 BOOST_FOREACH( NodeID& id, discovery )
643 logging_info("* " << id.toString());
644 discovery.clear();
645 logging_info("Running orphan removal");
646 orphan_removal_counter = 0;
647 for (size_t i = 0; i < table->size(); i++) {
648 route_item* it = (*table)[i];
649 if (it->ref_count == 0 && !it->info.isUnspecified()) {
650 logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString());
651 table->insert(it->id);
652 if (it->ref_count==0) {
653 LinkID id = it->info;
654 it->info = LinkID::UNSPECIFIED;
655 baseoverlay.dropLink(id);
656 }
657 }
658 }
659 }
660 }
661}
662
663void Chord::showLinks() {
664 logging_info("--- chord routing information ----------------------------------");
665 logging_info("predecessor: " << (table->get_predesessor()==NULL? "<none>" :
666 table->get_predesessor()->toString()) );
667 logging_info("node_id : " << nodeid.toString() );
668 logging_info("successor : " << (table->get_successor()==NULL? "<none>" :
669 table->get_successor()->toString()));
670 logging_info("----------------------------------------------------------------");
671}
672
673/// @see OverlayInterface.h
674std::string Chord::debugInformation() const {
675 std::ostringstream s;
676 s << "protocol : Chord" << endl;
677 s << "node_id : " << nodeid.toString() << endl;
678 s << "predecessor: " << (table->get_predesessor()==NULL? "<none>" :
679 table->get_predesessor()->toString()) << endl;
680 s << "successor : " << (table->get_successor()==NULL? "<none>" :
681 table->get_successor()->toString()) << endl;
682 s << "nodes: " << endl;
683 for (size_t i = 0; i < table->size(); i++) {
684 route_item* it = (*table)[i];
685 if (it->ref_count != 0 && !it->info.isUnspecified()) {
686 s << it->id.toString().substr(0,6)
687 << " using " << it->info.toString().substr(0,6) << endl;
688 }
689 }
690 return s.str();
691}
692
693
694
695}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.