close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/overlay/modules/chord/Chord.cpp@ 12761

Last change on this file since 12761 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 20.1 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "ariba/overlay/BaseOverlay.h"
40#include "ariba/overlay/messages/OverlayMsg.h"
41
42#include "Chord.h"
43#include "detail/chord_routing_table.hpp"
44
45//#include "messages/Discovery.h" // XXX DEPRECATED
46
47namespace ariba {
48namespace overlay {
49
50enum signalMessageTypes {
51 typeDiscovery = OverlayMsg::typeSignalingStart + 0x01,
52 typeLeave = OverlayMsg::typeSignalingStart + 0x02,
53};
54
55typedef chord_routing_table::item route_item;
56
57using ariba::transport::system_priority;
58
59use_logging_cpp( Chord );
60
61
62////// Messages
63struct DiscoveryMessage
64{
65 /**
66 * DiscoveryMessage
67 * - type
68 * - data
69 * - Endpoint
70 */
71
72 // type enum
73 enum type_ {
74 invalid = 0,
75 normal = 1,
76 successor = 2,
77 predecessor = 3
78 };
79
80
81 // data
82 uint8_t type;
83 uint8_t ttl;
84 EndpointDescriptor endpoint;
85
86 // serialize
87 reboost::message_t serialize()
88 {
89 // serialize endpoint
90 reboost::message_t msg = endpoint.serialize();
91
92 // serialize type and ttl
93 uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data();
94 buff1[0] = type;
95 buff1[1] = ttl;
96
97 return msg;
98 }
99
100 //deserialize
101 reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff)
102 {
103 // deserialize type and ttl
104 const uint8_t* bytes = buff.data();
105 type = bytes[0];
106 ttl = bytes[1];
107
108 // deserialize endpoint
109 return endpoint.deserialize(buff(2*sizeof(uint8_t)));
110 }
111};
112
113
114Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid,
115 OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) :
116 OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) {
117
118 // create routing table
119 this->table = new chord_routing_table(_nodeid, 4);
120 orphan_removal_counter = 0;
121 discovery_count = 0;
122 stabilize_counter = 0;
123 stabilize_finger = 0;
124}
125
126Chord::~Chord() {
127
128 // delete routing table
129 delete table;
130}
131
132/// helper: sets up a link using the base overlay
133LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) {
134
135 // check if we already have a connection
136 for (size_t i=0; i<table->size(); i++)
137 if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()))
138 return LinkID::UNSPECIFIED;
139
140 // check if we are already trying to establish a link
141 for (size_t i=0; i<pending.size(); i++)
142 if ( pending[i] == remote ) {
143 logging_debug("Already trying to establish a link to node "
144 << remote.toString() );
145 return LinkID::UNSPECIFIED;
146 }
147
148 // adding node to list of pending connections
149 pending.push_back( remote );
150
151 logging_info("Request to setup link to " << endpoint.toString() );
152
153 // establish link via base overlay
154 return baseoverlay.establishLink( endpoint, remote,
155 OverlayInterface::OVERLAY_SERVICE_ID );
156}
157
158/// helper: sends a message using the "base overlay"
159void Chord::send( OverlayMsg* msg, const LinkID& link ) {
160 if (link.isUnspecified())
161 return;
162
163 baseoverlay.send_link( msg, link, system_priority::OVERLAY );
164}
165
166void Chord::send_node( OverlayMsg* message, const NodeID& remote )
167{
168 try
169 {
170 baseoverlay.send( message, remote, system_priority::OVERLAY );
171 }
172 catch ( message_not_sent& e )
173 {
174 logging_warn("Chord: Could not send message to " << remote
175 << ": " << e.what());
176 }
177}
178
179/// sends a discovery message
180void Chord::send_discovery_to(const NodeID& remote, int ttl) {
181 LinkID link = getNextLinkId(remote);
182 if ( remote == nodeid || link.isUnspecified()) return;
183 if ( table->size() == 0 ) return;
184 ttl = 2;
185
186 OverlayMsg msg( typeDiscovery );
187 msg.setRegisterRelay(true);
188
189 // create DiscoveryMessage
190 DiscoveryMessage dmsg;
191 dmsg.type = DiscoveryMessage::normal;
192 dmsg.ttl = ttl;
193 dmsg.endpoint = baseoverlay.getEndpointDescriptor();
194
195 msg.set_payload_message(dmsg.serialize());
196
197 // send to node
198 try
199 {
200 baseoverlay.send_node( &msg, remote, system_priority::OVERLAY );
201 }
202 catch ( message_not_sent& e )
203 {
204 logging_warn("Chord: Could not send message to " << remote
205 << ": " << e.what());
206 }
207}
208
209void Chord::discover_neighbors( const LinkID& link ) {
210 uint8_t ttl = 1;
211
212 // FIXME try-catch for the send operations
213
214 // create DiscoveryMessage
215 DiscoveryMessage dmsg;
216 dmsg.ttl = ttl;
217 dmsg.endpoint = baseoverlay.getEndpointDescriptor();
218 {
219 // send predecessor discovery
220 OverlayMsg msg( typeDiscovery );
221 msg.setRegisterRelay(true);
222
223 // set type
224 dmsg.type = DiscoveryMessage::predecessor;
225
226 // send
227 msg.set_payload_message(dmsg.serialize());
228 send(&msg, link);
229 }
230 {
231 // send successor discovery
232 OverlayMsg msg( typeDiscovery );
233// msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); // XXX this was redundand, wasn't it?
234 msg.setRegisterRelay(true);
235
236 // set type
237 dmsg.type = DiscoveryMessage::successor;
238
239 // send
240 msg.set_payload_message(dmsg.serialize());
241 send(&msg, link);
242 }
243}
244
245
246void Chord::createOverlay() {
247}
248
249void Chord::deleteOverlay() {
250
251}
252
253void Chord::joinOverlay(const EndpointDescriptor& boot) {
254 logging_info( "joining Chord overlay structure through end-point " <<
255 (boot.isUnspecified() ? "local" : boot.toString()) );
256
257 // initiator? no->setup first link
258 if (!boot.isUnspecified())
259 bootstrapLinks.push_back( setup(boot) );
260
261 // timer for stabilization management
262// Timer::setInterval(1000); // TODO find an appropriate interval!
263 Timer::setInterval(10000); // XXX testing...
264 Timer::start();
265}
266
267void Chord::leaveOverlay() {
268 Timer::stop();
269 for (size_t i = 0; i < table->size(); i++) {
270 route_item* it = (*table)[i];
271 OverlayMsg msg( typeLeave );
272 send( &msg, it->info );
273 }
274}
275
276/// @see OverlayInterface.h
277const EndpointDescriptor& Chord::resolveNode(const NodeID& node) {
278 const route_item* item = table->get(node);
279 if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
280 return baseoverlay.getEndpointDescriptor(item->info);
281}
282
283/// @see OverlayInterface.h
284bool Chord::isClosestNodeTo( const NodeID& node ) {
285 return table->is_closest_to(node);
286}
287
288/// @see OverlayInterface.h
289const LinkID& Chord::getNextLinkId( const NodeID& id ) const {
290 // get next hop
291 const route_item* item = table->get_next_hop(id);
292
293 // returns a unspecified id when this is itself
294 if (item == NULL || item->id == nodeid)
295 return LinkID::UNSPECIFIED;
296
297 /// return routing info
298 return item->info;
299}
300
301std::vector<const LinkID*> Chord::getSortedLinkIdsTowardsNode(
302 const NodeID& id, int num ) const
303{
304 std::vector<const LinkID*> ret;
305
306 switch ( num )
307 {
308 // special case: just call »getNextLinkId«
309 case 1:
310 {
311 ret.push_back(&getNextLinkId(id));
312
313 break;
314 }
315
316 // * calculate top 2 *
317 case 0:
318 case 2:
319 {
320 std::vector<const route_item*> items = table->get_next_2_hops(id);
321
322 ret.reserve(items.size());
323
324 BOOST_FOREACH( const route_item* item, items )
325 {
326 ret.push_back(&item->info);
327 }
328
329 break;
330 }
331
332 // NOTE: implement real sorting, if needed (and handle "case 0" properly, then)
333 default:
334 {
335 throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)");
336
337 break;
338 }
339 }
340
341 return ret;
342}
343
344
345/// @see OverlayInterface.h
346const NodeID& Chord::getNextNodeId( const NodeID& id ) const {
347 // get next hop
348 const route_item* item = table->get_next_hop(id);
349
350 // return unspecified if no next hop could be found
351 if (item == NULL) {
352 return NodeID::UNSPECIFIED;
353 }
354
355 return item->id;
356}
357
358OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const {
359 OverlayInterface::NodeList nodelist;
360
361 if( deep ){
362 // all nodes that I know, fingers, succ/pred
363 for (size_t i = 0; i < table->size(); i++){
364 if ((*table)[i]->ref_count != 0
365 && !(*table)[i]->info.isUnspecified())
366 nodelist.push_back((*table)[i]->id);
367 }
368 } else {
369 // only succ and pred
370 if( table->get_predesessor() != NULL ){
371 nodelist.push_back( *(table->get_predesessor()) );
372 }
373 if( table->get_successor() != NULL ){
374 OverlayInterface::NodeList::iterator i =
375 std::find( nodelist.begin(), nodelist.end(), *(table->get_successor()) );
376 if( i == nodelist.end() )
377 nodelist.push_back( *(table->get_successor()) );
378 }
379 }
380
381 return nodelist;
382}
383
384/// @see CommunicationListener.h
385/// @see OverlayInterface.h
386void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) {
387 logging_info("link_up: link=" << lnk.toString() << " remote=" <<
388 remote.toString() );
389 for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++)
390 if (*i == remote) {
391 pending.erase(i);
392 break;
393 }
394
395 if (remote==nodeid) {
396 logging_warn("dropping link that has been established to myself (nodes have same nodeid?)");
397 logging_warn("NodeID: " << remote);
398 baseoverlay.dropLink(lnk);
399 return;
400 }
401
402 route_item* item = table->insert(remote);
403
404 // item added to routing table?
405 if (item != NULL) { // yes-> add to routing table
406 logging_info("new routing neighbor: " << remote.toString()
407 << " with link " << lnk.toString());
408
409 // replace with new link if link is "better"
410 if (item->info!=lnk && item->info.isUnspecified()==false) {
411 if (baseoverlay.compare( item->info, lnk ) == 1) {
412 logging_info("Replacing link due to concurrent link establishment.");
413 baseoverlay.dropLink(item->info);
414 item->info = lnk;
415 }
416 } else {
417 item->info = lnk;
418 }
419
420 // discover neighbors of new overlay neighbor
421 showLinks();
422 } else { // no-> add orphan entry to routing table
423 logging_info("new orphan: " << remote.toString()
424 << " with link " << lnk.toString());
425 table->insert_orphan(remote)->info = lnk;
426 }
427
428 // erase bootstrap link
429 vector<LinkID>::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk);
430 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
431}
432
433/// @see CommunicationListener.h or @see OverlayInterface.h
434void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) {
435 // XXX logging_debug
436 logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" <<
437 remote.toString() );
438
439 // remove link from routing table
440 route_item* item = table->get(remote);
441 if (item!=NULL && item->info==lnk) {
442 item->info = LinkID::UNSPECIFIED;
443 table->remove(remote);
444 }
445}
446
447/// @see CommunicationListener.h
448/// @see OverlayInterface.h
449void Chord::onMessage(OverlayMsg* msg,
450 reboost::shared_buffer_t sub_msg,
451 const NodeID& remote,
452 const LinkID& link) {
453
454 // handle messages
455 switch ((signalMessageTypes) msg->getType()) {
456
457 // discovery request
458 case typeDiscovery:
459 {
460 // deserialize discovery message
461 DiscoveryMessage dmsg;
462 dmsg.deserialize(sub_msg);
463
464 logging_debug("Received discovery message with"
465 << " src=" << msg->getSourceNode().toString()
466 << " dst=" << msg->getDestinationNode().toString()
467 << " ttl=" << (int)dmsg.ttl
468 << " type=" << (int)dmsg.type
469 );
470
471 // add discovery node id
472 bool found = false;
473 BOOST_FOREACH( NodeID& value, discovery )
474 if (value == msg->getSourceNode()) {
475 found = true;
476 break;
477 }
478 if (!found) discovery.push_back(msg->getSourceNode());
479
480 // check if source node can be added to routing table and setup link
481 if (msg->getSourceNode() != nodeid)
482 setup( dmsg.endpoint, msg->getSourceNode() );
483
484 // process discovery message -------------------------- switch start --
485 switch ( dmsg.type )
486 {
487 // normal: route discovery message like every other message
488 case DiscoveryMessage::normal:
489 {
490 // closest node? yes-> split to follow successor and predecessor
491 if ( table->is_closest_to(msg->getDestinationNode()) )
492 {
493 logging_debug("Discovery split:");
494 if (!table->get_successor()->isUnspecified())
495 {
496 OverlayMsg omsg(*msg);
497
498 dmsg.type = DiscoveryMessage::successor;
499 omsg.set_payload_message(dmsg.serialize());
500
501 logging_debug("* Routing to successor "
502 << table->get_successor()->toString() );
503 send_node( &omsg, *table->get_successor() );
504 }
505
506 // send predecessor message
507 if (!table->get_predesessor()->isUnspecified())
508 {
509 OverlayMsg omsg(*msg);
510
511 dmsg.type = DiscoveryMessage::predecessor;
512 omsg.set_payload_message(dmsg.serialize());
513
514 logging_debug("* Routing to predecessor "
515 << table->get_predesessor()->toString() );
516 send_node( &omsg, *table->get_predesessor() );
517 }
518 }
519 // no-> route message
520 else
521 {
522 baseoverlay.route( msg );
523 }
524 break;
525 }
526
527 // successor mode: follow the successor until TTL is zero
528 case DiscoveryMessage::successor:
529 case DiscoveryMessage::predecessor:
530 {
531 // reached destination? no->forward!
532 if (msg->getDestinationNode() != nodeid)
533 {
534 OverlayMsg omsg(*msg);
535 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
536
537 omsg.set_payload_message(dmsg.serialize());
538
539 baseoverlay.route( &omsg );
540 break;
541 }
542
543 // time to live ended? yes-> stop routing
544 if (dmsg.ttl == 0 || dmsg.ttl > 10) break;
545
546 // decrease time-to-live
547 dmsg.ttl--;
548
549 const route_item* item = NULL;
550 if (dmsg.type == DiscoveryMessage::successor &&
551 table->get_successor() != NULL)
552 {
553 item = table->get(*table->get_successor());
554 }
555 else if (table->get_predesessor() != NULL)
556 {
557 item = table->get(*table->get_predesessor());
558 }
559 if (item == NULL)
560 break;
561
562 logging_debug("Routing discovery message to succ/pred "
563 << item->id.toString() );
564 OverlayMsg omsg(*msg);
565 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
566 omsg.setDestinationNode(item->id);
567
568 omsg.set_payload_message(dmsg.serialize());
569
570 send_node( &omsg, omsg.getDestinationNode() );
571 break;
572 }
573 case DiscoveryMessage::invalid:
574 break;
575
576 default:
577 break;
578 }
579 // process discovery message ---------------------------- switch end --
580
581 break;
582 }
583
584 // leave
585 case typeLeave: {
586 if (link!=LinkID::UNSPECIFIED) {
587 route_item* item = table->get(remote);
588 if (item!=NULL) item->info = LinkID::UNSPECIFIED;
589 table->remove(remote);
590 baseoverlay.dropLink(link);
591 }
592 break;
593 }
594 }
595}
596
597void Chord::eventFunction() {
598 stabilize_counter++;
599 if (stabilize_counter < 0 || stabilize_counter == 2) {
600
601 // reset counter
602 stabilize_counter = 0;
603
604 // clear pending connections
605 pending.clear();
606
607 // get number of real neighbors
608 size_t numNeighbors = 0;
609 for (size_t i = 0; i < table->size(); i++) {
610 route_item* it = (*table)[i];
611 if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++;
612 }
613 logging_info("Running stabilization: #links="
614 << table->size() << " #neighbors=" << numNeighbors );
615
616 // updating neighbors
617 logging_debug("Discover new ring neighbors");
618 for (size_t i=0; i<table->size(); i++) {
619 LinkID id = (*table)[i]->info;
620 if (!id.isUnspecified()) discover_neighbors(id);
621 }
622
623 // sending discovery
624 logging_debug("Sending discovery message to my neighbors and fingers");
625 stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() );
626 const NodeID disc = table->get_finger_table(stabilize_finger).get_compare().get_center();
627 if (disc != nodeid)
628 send_discovery_to(disc);
629
630 // remove orphan links
631 orphan_removal_counter++;
632 if (orphan_removal_counter <0 || orphan_removal_counter >= 2) {
633 logging_info("Discovered nodes: ");
634 BOOST_FOREACH( NodeID& id, discovery )
635 logging_info("* " << id.toString());
636 discovery.clear();
637 logging_info("Running orphan removal");
638 orphan_removal_counter = 0;
639 for (size_t i = 0; i < table->size(); i++) {
640 route_item* it = (*table)[i];
641 if (it->ref_count == 0 && !it->info.isUnspecified()) {
642 logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString());
643 table->insert(it->id);
644 if (it->ref_count==0) {
645 LinkID id = it->info;
646 it->info = LinkID::UNSPECIFIED;
647 baseoverlay.dropLink(id);
648 }
649 }
650 }
651 }
652 }
653}
654
655void Chord::showLinks() {
656 logging_info("--- chord routing information ----------------------------------");
657 logging_info("predecessor: " << (table->get_predesessor()==NULL? "<none>" :
658 table->get_predesessor()->toString()) );
659 logging_info("node_id : " << nodeid.toString() );
660 logging_info("successor : " << (table->get_successor()==NULL? "<none>" :
661 table->get_successor()->toString()));
662 logging_info("----------------------------------------------------------------");
663}
664
665/// @see OverlayInterface.h
666std::string Chord::debugInformation() const {
667 std::ostringstream s;
668 s << "protocol : Chord" << endl;
669 s << "node_id : " << nodeid.toString() << endl;
670 s << "predecessor: " << (table->get_predesessor()==NULL? "<none>" :
671 table->get_predesessor()->toString()) << endl;
672 s << "successor : " << (table->get_successor()==NULL? "<none>" :
673 table->get_successor()->toString()) << endl;
674 s << "nodes: " << endl;
675 for (size_t i = 0; i < table->size(); i++) {
676 route_item* it = (*table)[i];
677 if (it->ref_count != 0 && !it->info.isUnspecified()) {
678 s << it->id.toString().substr(0,6)
679 << " using " << it->info.toString().substr(0,6) << endl;
680 }
681 }
682 return s.str();
683}
684
685
686
687}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.