00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "ariba/overlay/BaseOverlay.h"
00040 #include "ariba/overlay/messages/OverlayMsg.h"
00041
00042 #include "Chord.h"
00043 #include "detail/chord_routing_table.hpp"
00044
00045 #include "messages/Discovery.h"
00046
00047 namespace ariba {
00048 namespace overlay {
00049
00050 enum signalMessageTypes {
00051 typeDiscovery = OverlayMsg::typeSignalingStart + 0x01,
00052 typeLeave = OverlayMsg::typeSignalingStart + 0x02,
00053 };
00054
00055 typedef chord_routing_table::item route_item;
00056
00057 use_logging_cpp( Chord );
00058
00059 Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid,
00060 OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) :
00061 OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) {
00062
00063
00064 this->table = new chord_routing_table(_nodeid, 4);
00065 orphan_removal_counter = 0;
00066 discovery_count = 0;
00067 stabilize_counter = 0;
00068 stabilize_finger = 0;
00069 }
00070
00071 Chord::~Chord() {
00072
00073
00074 delete table;
00075 }
00076
00078 LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) {
00079
00080
00081 for (size_t i=0; i<table->size(); i++)
00082 if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()))
00083 return LinkID::UNSPECIFIED;
00084
00085
00086 for (size_t i=0; i<pending.size(); i++)
00087 if ( pending[i] == remote ) {
00088 logging_debug("Already trying to establish a link to node "
00089 << remote.toString() );
00090 return LinkID::UNSPECIFIED;
00091 }
00092
00093
00094 pending.push_back( remote );
00095
00096 logging_info("Request to setup link to " << endpoint.toString() );
00097
00098
00099 return baseoverlay.establishLink( endpoint, remote,
00100 OverlayInterface::OVERLAY_SERVICE_ID );
00101 }
00102
00104 seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) {
00105 if (link.isUnspecified()) return 0;
00106 return baseoverlay.send_link( msg, link );
00107 }
00108
00110 void Chord::send_discovery_to(const NodeID& remote, int ttl) {
00111 LinkID link = getNextLinkId(remote);
00112 if ( remote == nodeid || link.isUnspecified()) return;
00113 if ( table->size() == 0 ) return;
00114 ttl = 2;
00115
00116 OverlayMsg msg( typeDiscovery );
00117 msg.setRegisterRelay(true);
00118 Discovery dmsg( Discovery::normal, (uint8_t)ttl, baseoverlay.getEndpointDescriptor() );
00119 msg.encapsulate(&dmsg);
00120
00121
00122 baseoverlay.send_node( &msg, remote );
00123 }
00124
00125 void Chord::discover_neighbors( const LinkID& link ) {
00126 uint8_t ttl = 1;
00127 {
00128
00129 OverlayMsg msg( typeDiscovery );
00130 msg.setRegisterRelay(true);
00131 Discovery dmsg( Discovery::predecessor, ttl,
00132 baseoverlay.getEndpointDescriptor() );
00133 msg.encapsulate(&dmsg);
00134 send(&msg, link);
00135 }
00136 {
00137
00138 OverlayMsg msg( typeDiscovery );
00139 msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() );
00140 msg.setRegisterRelay(true);
00141 Discovery dmsg( Discovery::successor, ttl,
00142 baseoverlay.getEndpointDescriptor() );
00143 msg.encapsulate(&dmsg);
00144 send(&msg, link);
00145 }
00146 }
00147
00148
00149 void Chord::createOverlay() {
00150 }
00151
00152 void Chord::deleteOverlay() {
00153
00154 }
00155
00156 void Chord::joinOverlay(const EndpointDescriptor& boot) {
00157 logging_info( "joining Chord overlay structure through end-point " <<
00158 (boot.isUnspecified() ? "local" : boot.toString()) );
00159
00160
00161 if (!boot.isUnspecified())
00162 bootstrapLinks.push_back( setup(boot) );
00163
00164
00165 Timer::setInterval(1000);
00166 Timer::start();
00167 }
00168
00169 void Chord::leaveOverlay() {
00170 Timer::stop();
00171 for (size_t i = 0; i < table->size(); i++) {
00172 route_item* it = (*table)[i];
00173 OverlayMsg msg( typeLeave );
00174 send( &msg, it->info );
00175 }
00176 }
00177
00179 const EndpointDescriptor& Chord::resolveNode(const NodeID& node) {
00180 const route_item* item = table->get(node);
00181 if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
00182 return baseoverlay.getEndpointDescriptor(item->info);
00183 }
00184
00186 bool Chord::isClosestNodeTo( const NodeID& node ) {
00187 return table->is_closest_to(node);
00188 }
00189
00191 const LinkID& Chord::getNextLinkId( const NodeID& id ) const {
00192
00193 const route_item* item = table->get_next_hop(id);
00194
00195
00196 if (item == NULL || item->id == nodeid)
00197 return LinkID::UNSPECIFIED;
00198
00200 return item->info;
00201 }
00202
00203 OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const {
00204 OverlayInterface::NodeList nodelist;
00205
00206 if( deep ){
00207
00208 for (size_t i = 0; i < table->size(); i++){
00209 if ((*table)[i]->ref_count != 0
00210 && !(*table)[i]->info.isUnspecified())
00211 nodelist.push_back((*table)[i]->id);
00212 }
00213 } else {
00214
00215 if( table->get_predesessor() != NULL ){
00216 nodelist.push_back( *(table->get_predesessor()) );
00217 }
00218 if( table->get_successor() != NULL ){
00219 OverlayInterface::NodeList::iterator i =
00220 std::find( nodelist.begin(), nodelist.end(), *(table->get_successor()) );
00221 if( i == nodelist.end() )
00222 nodelist.push_back( *(table->get_successor()) );
00223 }
00224 }
00225
00226 return nodelist;
00227 }
00228
00231 void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) {
00232 logging_info("link_up: link=" << lnk.toString() << " remote=" <<
00233 remote.toString() );
00234 for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++)
00235 if (*i == remote) {
00236 pending.erase(i);
00237 break;
00238 }
00239
00240 if (remote==nodeid) {
00241 logging_warn("dropping link that has been established to myself (nodes have same nodeid?)");
00242 baseoverlay.dropLink(lnk);
00243 return;
00244 }
00245
00246 route_item* item = table->insert(remote);
00247
00248
00249 if (item != NULL) {
00250 logging_info("new routing neighbor: " << remote.toString()
00251 << " with link " << lnk.toString());
00252
00253
00254 if (item->info!=lnk && item->info.isUnspecified()==false) {
00255 if (baseoverlay.compare( item->info, lnk ) == 1) {
00256 logging_info("Replacing link due to concurrent link establishment.");
00257 baseoverlay.dropLink(item->info);
00258 item->info = lnk;
00259 }
00260 } else {
00261 item->info = lnk;
00262 }
00263
00264
00265 showLinks();
00266 } else {
00267 logging_info("new orphan: " << remote.toString()
00268 << " with link " << lnk.toString());
00269 table->insert_orphan(remote)->info = lnk;
00270 }
00271
00272
00273 vector<LinkID>::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk);
00274 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
00275 }
00276
00278 void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) {
00279 logging_debug("link_down: link=" << lnk.toString() << " remote=" <<
00280 remote.toString() );
00281
00282
00283 route_item* item = table->get(remote);
00284 if (item!=NULL && item->info==lnk) {
00285 item->info = LinkID::UNSPECIFIED;
00286 table->remove(remote);
00287 }
00288 }
00289
00292 void Chord::onMessage(const DataMessage& msg, const NodeID& remote,
00293 const LinkID& link) {
00294
00295
00296 OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage());
00297 if (m == NULL) return;
00298
00299
00300 switch ((signalMessageTypes)m->getType()) {
00301
00302
00303 case typeDiscovery: {
00304
00305 Discovery* dmsg = m->decapsulate<Discovery> ();
00306 logging_debug("Received discovery message with"
00307 << " src=" << m->getSourceNode().toString()
00308 << " dst=" << m->getDestinationNode().toString()
00309 << " ttl=" << (int)dmsg->getTTL()
00310 << " type=" << (int)dmsg->getType()
00311 );
00312
00313
00314 bool found = false;
00315 BOOST_FOREACH( NodeID& value, discovery )
00316 if (value == m->getSourceNode()) {
00317 found = true;
00318 break;
00319 }
00320 if (!found) discovery.push_back(m->getSourceNode());
00321
00322
00323 if (m->getSourceNode() != nodeid)
00324 setup( dmsg->getEndpoint(), m->getSourceNode() );
00325
00326
00327 switch (dmsg->getType()) {
00328
00329
00330 case Discovery::normal: {
00331
00332 if ( table->is_closest_to(m->getDestinationNode()) ) {
00333 logging_debug("Discovery split:");
00334 if (!table->get_successor()->isUnspecified()) {
00335 OverlayMsg omsg(*m);
00336 dmsg->setType(Discovery::successor);
00337 omsg.encapsulate(dmsg);
00338 logging_debug("* Routing to successor "
00339 << table->get_successor()->toString() );
00340 baseoverlay.send( &omsg, *table->get_successor() );
00341 }
00342
00343
00344 if (!table->get_predesessor()->isUnspecified()) {
00345 OverlayMsg omsg(*m);
00346 dmsg->setType(Discovery::predecessor);
00347 omsg.encapsulate(dmsg);
00348 logging_debug("* Routing to predecessor "
00349 << table->get_predesessor()->toString() );
00350 baseoverlay.send( &omsg, *table->get_predesessor() );
00351 }
00352 }
00353
00354 else {
00355 baseoverlay.route( m );
00356 }
00357 break;
00358 }
00359
00360
00361 case Discovery::successor:
00362 case Discovery::predecessor: {
00363
00364 if (m->getDestinationNode() != nodeid) {
00365 OverlayMsg omsg(*m);
00366 omsg.encapsulate(dmsg);
00367 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
00368 baseoverlay.route( &omsg );
00369 break;
00370 }
00371
00372
00373 if (dmsg->getTTL() == 0 || dmsg->getTTL() > 10) break;
00374
00375
00376 dmsg->setTTL(dmsg->getTTL() - 1);
00377
00378 const route_item* item = NULL;
00379 if (dmsg->getType() == Discovery::successor &&
00380 table->get_successor() != NULL) {
00381 item = table->get(*table->get_successor());
00382 } else {
00383 if (table->get_predesessor()!=NULL)
00384 item = table->get(*table->get_predesessor());
00385 }
00386 if (item == NULL)
00387 break;
00388
00389 logging_debug("Routing discovery message to succ/pred "
00390 << item->id.toString() );
00391 OverlayMsg omsg(*m);
00392 omsg.encapsulate(dmsg);
00393 omsg.setDestinationNode(item->id);
00394 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
00395 baseoverlay.send(&omsg, omsg.getDestinationNode());
00396 break;
00397 }
00398 case Discovery::invalid:
00399 break;
00400
00401 default:
00402 break;
00403 }
00404
00405
00406 delete dmsg;
00407 break;
00408 }
00409
00410
00411 case typeLeave: {
00412 if (link!=LinkID::UNSPECIFIED) {
00413 route_item* item = table->get(remote);
00414 if (item!=NULL) item->info = LinkID::UNSPECIFIED;
00415 table->remove(remote);
00416 baseoverlay.dropLink(link);
00417 }
00418 break;
00419 }}
00420 }
00421
00422 void Chord::eventFunction() {
00423 stabilize_counter++;
00424 if (stabilize_counter < 0 || stabilize_counter == 2) {
00425
00426
00427 stabilize_counter = 0;
00428
00429
00430 pending.clear();
00431
00432
00433 size_t numNeighbors = 0;
00434 for (size_t i = 0; i < table->size(); i++) {
00435 route_item* it = (*table)[i];
00436 if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++;
00437 }
00438 logging_info("Running stabilization: #links="
00439 << table->size() << " #neighbors=" << numNeighbors );
00440
00441
00442 logging_debug("Discover new ring neighbors");
00443 for (size_t i=0; i<table->size(); i++) {
00444 LinkID id = (*table)[i]->info;
00445 if (!id.isUnspecified()) discover_neighbors(id);
00446 }
00447
00448
00449 logging_debug("Sending discovery message to my neighbors and fingers");
00450 stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() );
00451 const NodeID disc = table->get_finger_table(stabilize_finger).get_compare().get_center();
00452 if (disc != nodeid)
00453 send_discovery_to(disc);
00454
00455
00456 orphan_removal_counter++;
00457 if (orphan_removal_counter <0 || orphan_removal_counter >= 2) {
00458 logging_info("Discovered nodes: ");
00459 BOOST_FOREACH( NodeID& id, discovery )
00460 logging_info("* " << id.toString());
00461 discovery.clear();
00462 logging_info("Running orphan removal");
00463 orphan_removal_counter = 0;
00464 for (size_t i = 0; i < table->size(); i++) {
00465 route_item* it = (*table)[i];
00466 if (it->ref_count == 0 && !it->info.isUnspecified()) {
00467 logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString());
00468 table->insert(it->id);
00469 if (it->ref_count==0) {
00470 LinkID id = it->info;
00471 it->info = LinkID::UNSPECIFIED;
00472 baseoverlay.dropLink(id);
00473 }
00474 }
00475 }
00476 }
00477 }
00478 }
00479
00480 void Chord::showLinks() {
00481 logging_info("--- chord routing information ----------------------------------");
00482 logging_info("predecessor: " << (table->get_predesessor()==NULL? "<none>" :
00483 table->get_predesessor()->toString()) );
00484 logging_info("node_id : " << nodeid.toString() );
00485 logging_info("successor : " << (table->get_successor()==NULL? "<none>" :
00486 table->get_successor()->toString()));
00487 logging_info("----------------------------------------------------------------");
00488 }
00489
00491 std::string Chord::debugInformation() const {
00492 std::ostringstream s;
00493 s << "protocol : Chord" << endl;
00494 s << "node_id : " << nodeid.toString() << endl;
00495 s << "predecessor: " << (table->get_predesessor()==NULL? "<none>" :
00496 table->get_predesessor()->toString()) << endl;
00497 s << "successor : " << (table->get_successor()==NULL? "<none>" :
00498 table->get_successor()->toString()) << endl;
00499 s << "nodes: " << endl;
00500 for (size_t i = 0; i < table->size(); i++) {
00501 route_item* it = (*table)[i];
00502 if (it->ref_count != 0 && !it->info.isUnspecified()) {
00503 s << it->id.toString().substr(0,6)
00504 << " using " << it->info.toString().substr(0,6) << endl;
00505 }
00506 }
00507 return s.str();
00508 }
00509
00510
00511
00512 }}