00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "ariba/overlay/BaseOverlay.h"
00040 #include "ariba/overlay/messages/OverlayMsg.h"
00041
00042 #include "Chord.h"
00043 #include "detail/chord_routing_table.hpp"
00044
00045 #include "messages/Discovery.h"
00046
00047 namespace ariba {
00048 namespace overlay {
00049
00050 enum signalMessageTypes {
00051 typeDiscovery = OverlayMsg::typeSignalingStart + 0x01,
00052 typeLeave = OverlayMsg::typeSignalingStart + 0x02,
00053 };
00054
00055 typedef chord_routing_table::item route_item;
00056
00057 use_logging_cpp( Chord );
00058
00059 Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid,
00060 OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) :
00061 OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) {
00062
00063
00064 this->table = new chord_routing_table(_nodeid, 4);
00065 orphan_removal_counter = 0;
00066 discovery_count = 0;
00067 stabilize_counter = 0;
00068 stabilize_finger = 0;
00069 }
00070
00071 Chord::~Chord() {
00072
00073
00074 delete table;
00075 }
00076
00078 LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) {
00079
00080
00081 for (int i=0; i<table->size(); i++)
00082 if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()))
00083 return LinkID::UNSPECIFIED;
00084
00085
00086 for (size_t i=0; i<pending.size(); i++)
00087 if ( pending[i] == remote ) {
00088 logging_debug("Already trying to establish a link to node "
00089 << remote.toString() );
00090 return LinkID::UNSPECIFIED;
00091 }
00092
00093
00094 pending.push_back( remote );
00095
00096 logging_info("Request to setup link to " << endpoint.toString() );
00097
00098
00099 return baseoverlay.establishLink( endpoint, remote,
00100 OverlayInterface::OVERLAY_SERVICE_ID );
00101 }
00102
00104 seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) {
00105 if (link.isUnspecified()) return 0;
00106 return baseoverlay.send_link( msg, link );
00107 }
00108
00110 void Chord::send_discovery_to(const NodeID& remote, int ttl) {
00111 LinkID link = getNextLinkId(remote);
00112 if ( remote == nodeid || link.isUnspecified()) return;
00113 if ( table->size() == 0 ) return;
00114 ttl = 2;
00115
00116 OverlayMsg msg( typeDiscovery );
00117 msg.setRegisterRelay(true);
00118 Discovery dmsg( Discovery::normal, (uint8_t)ttl, baseoverlay.getEndpointDescriptor() );
00119 msg.encapsulate(&dmsg);
00120
00121
00122 baseoverlay.send_node( &msg, remote );
00123 }
00124
00125 void Chord::discover_neighbors( const LinkID& link ) {
00126 uint8_t ttl = 1;
00127 {
00128
00129 OverlayMsg msg( typeDiscovery );
00130 msg.setRegisterRelay(true);
00131 Discovery dmsg( Discovery::predecessor, ttl,
00132 baseoverlay.getEndpointDescriptor() );
00133 msg.encapsulate(&dmsg);
00134 send(&msg, link);
00135 }
00136 {
00137
00138 OverlayMsg msg( typeDiscovery );
00139 msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() );
00140 msg.setRegisterRelay(true);
00141 Discovery dmsg( Discovery::successor, ttl,
00142 baseoverlay.getEndpointDescriptor() );
00143 msg.encapsulate(&dmsg);
00144 send(&msg, link);
00145 }
00146 }
00147
00148
00149 void Chord::createOverlay() {
00150 }
00151
00152 void Chord::deleteOverlay() {
00153
00154 }
00155
00156 void Chord::joinOverlay(const EndpointDescriptor& boot) {
00157 logging_info( "joining Chord overlay structure through end-point " <<
00158 (boot.isUnspecified() ? "local" : boot.toString()) );
00159
00160
00161 if (!boot.isUnspecified())
00162 bootstrapLinks.push_back( setup(boot) );
00163
00164
00165 Timer::setInterval(1000);
00166 Timer::start();
00167 }
00168
00169 void Chord::leaveOverlay() {
00170 Timer::stop();
00171 for (size_t i = 0; i < table->size(); i++) {
00172 route_item* it = (*table)[i];
00173 OverlayMsg msg( typeLeave );
00174 send( &msg, it->info );
00175 }
00176 }
00177
00179 const EndpointDescriptor& Chord::resolveNode(const NodeID& node) {
00180 const route_item* item = table->get(node);
00181 if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
00182 return baseoverlay.getEndpointDescriptor(item->info);
00183 }
00184
00186 const LinkID& Chord::getNextLinkId( const NodeID& id ) const {
00187
00188 const route_item* item = table->get_next_hop(id);
00189
00190
00191 if (item == NULL || item->id == nodeid)
00192 return LinkID::UNSPECIFIED;
00193
00195 return item->info;
00196 }
00197
00198 OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const {
00199 OverlayInterface::NodeList nodelist;
00200
00201 if( deep ){
00202
00203 for (size_t i = 0; i < table->size(); i++){
00204 if ((*table)[i]->ref_count != 0
00205 && !(*table)[i]->info.isUnspecified())
00206 nodelist.push_back((*table)[i]->id);
00207 }
00208 } else {
00209
00210 if( table->get_predesessor() != NULL ){
00211 nodelist.push_back( *(table->get_predesessor()) );
00212 }
00213 if( table->get_successor() != NULL ){
00214 OverlayInterface::NodeList::iterator i =
00215 std::find( nodelist.begin(), nodelist.end(), *(table->get_successor()) );
00216 if( i == nodelist.end() )
00217 nodelist.push_back( *(table->get_successor()) );
00218 }
00219 }
00220
00221 return nodelist;
00222 }
00223
00226 void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) {
00227 logging_info("link_up: link=" << lnk.toString() << " remote=" <<
00228 remote.toString() );
00229 for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++)
00230 if (*i == remote) {
00231 pending.erase(i);
00232 break;
00233 }
00234
00235 if (remote==nodeid) {
00236 baseoverlay.dropLink(lnk);
00237 return;
00238 }
00239
00240 route_item* item = table->insert(remote);
00241
00242
00243 if (item != NULL) {
00244 logging_info("new routing neighbor: " << remote.toString()
00245 << " with link " << lnk.toString());
00246
00247 if (!item->info.isUnspecified() || item->info!=lnk)
00248 baseoverlay.dropLink(item->info);
00249 item->info = lnk;
00250
00251
00252 showLinks();
00253 } else {
00254 logging_info("new orphan: " << remote.toString()
00255 << " with link " << lnk.toString());
00256 table->insert_orphan(remote)->info = lnk;
00257 }
00258
00259
00260 vector<LinkID>::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk);
00261 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
00262 }
00263
00265 void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) {
00266 logging_debug("link_down: link=" << lnk.toString() << " remote=" <<
00267 remote.toString() );
00268
00269
00270 route_item* item = table->get(remote);
00271 if (item!=NULL && item->info==lnk) {
00272 item->info = LinkID::UNSPECIFIED;
00273 table->remove(remote);
00274 }
00275 }
00276
00279 void Chord::onMessage(const DataMessage& msg, const NodeID& remote,
00280 const LinkID& link) {
00281
00282
00283 OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage());
00284 if (m == NULL) return;
00285
00286
00287 switch (m->getType()) {
00288
00289
00290 case typeDiscovery: {
00291
00292 Discovery* dmsg = m->decapsulate<Discovery> ();
00293 logging_debug("Received discovery message with"
00294 << " src=" << m->getSourceNode().toString()
00295 << " dst=" << m->getDestinationNode().toString()
00296 << " ttl=" << (int)dmsg->getTTL()
00297 << " type=" << (int)dmsg->getType()
00298 );
00299
00300
00301 bool found = false;
00302 BOOST_FOREACH( NodeID& value, discovery )
00303 if (value == m->getSourceNode()) {
00304 found = true;
00305 break;
00306 }
00307 if (!found) discovery.push_back(m->getSourceNode());
00308
00309
00310 if (m->getSourceNode() != nodeid)
00311 setup( dmsg->getEndpoint(), m->getSourceNode() );
00312
00313
00314 switch (dmsg->getType()) {
00315
00316
00317 case Discovery::normal: {
00318
00319 if ( table->is_closest_to(m->getDestinationNode()) ) {
00320 logging_debug("Discovery split:");
00321 if (!table->get_successor()->isUnspecified()) {
00322 OverlayMsg omsg(*m);
00323 dmsg->setType(Discovery::successor);
00324 omsg.encapsulate(dmsg);
00325 logging_debug("* Routing to successor "
00326 << table->get_successor()->toString() );
00327 baseoverlay.send( &omsg, *table->get_successor() );
00328 }
00329
00330
00331 if (!table->get_predesessor()->isUnspecified()) {
00332 OverlayMsg omsg(*m);
00333 dmsg->setType(Discovery::predecessor);
00334 omsg.encapsulate(dmsg);
00335 logging_debug("* Routing to predecessor "
00336 << table->get_predesessor()->toString() );
00337 baseoverlay.send( &omsg, *table->get_predesessor() );
00338 }
00339 }
00340
00341 else {
00342 baseoverlay.route( m );
00343 }
00344 break;
00345 }
00346
00347
00348 case Discovery::successor:
00349 case Discovery::predecessor: {
00350
00351 if (m->getDestinationNode() != nodeid) {
00352 OverlayMsg omsg(*m);
00353 omsg.encapsulate(dmsg);
00354 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
00355 baseoverlay.route( &omsg );
00356 break;
00357 }
00358
00359
00360 if (dmsg->getTTL() == 0 || dmsg->getTTL() > 10) break;
00361
00362
00363 dmsg->setTTL(dmsg->getTTL() - 1);
00364
00365 const route_item* item = NULL;
00366 if (dmsg->getType() == Discovery::successor &&
00367 table->get_successor() != NULL) {
00368 item = table->get(*table->get_successor());
00369 } else {
00370 if (table->get_predesessor()!=NULL)
00371 item = table->get(*table->get_predesessor());
00372 }
00373 if (item == NULL)
00374 break;
00375
00376 logging_debug("Routing discovery message to succ/pred "
00377 << item->id.toString() );
00378 OverlayMsg omsg(*m);
00379 omsg.encapsulate(dmsg);
00380 omsg.setDestinationNode(item->id);
00381 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID);
00382 baseoverlay.send(&omsg, omsg.getDestinationNode());
00383 break;
00384 }}
00385 delete dmsg;
00386 break;
00387 }
00388
00389
00390 case typeLeave: {
00391 if (link!=LinkID::UNSPECIFIED) {
00392 route_item* item = table->get(remote);
00393 if (item!=NULL) item->info = LinkID::UNSPECIFIED;
00394 table->remove(remote);
00395 baseoverlay.dropLink(link);
00396 }
00397 break;
00398 }}
00399 }
00400
00401 void Chord::eventFunction() {
00402 stabilize_counter++;
00403 if (stabilize_counter < 0 || stabilize_counter == 2) {
00404
00405
00406 stabilize_counter = 0;
00407
00408
00409 pending.clear();
00410
00411
00412 size_t numNeighbors = 0;
00413 for (size_t i = 0; i < table->size(); i++) {
00414 route_item* it = (*table)[i];
00415 if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++;
00416 }
00417 logging_info("Running stabilization: #links="
00418 << table->size() << " #neighbors=" << numNeighbors );
00419
00420
00421 logging_debug("Discover new ring neighbors");
00422 for (int i=0; i<table->size(); i++) {
00423 LinkID id = (*table)[i]->info;
00424 if (!id.isUnspecified()) discover_neighbors(id);
00425 }
00426
00427
00428 logging_debug("Sending discovery message to my neighbors and fingers");
00429 stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() );
00430 const NodeID disc = table->get_finger_table(stabilize_finger).get_compare().get_center();
00431 if (disc != nodeid)
00432 send_discovery_to(disc);
00433
00434
00435 orphan_removal_counter++;
00436 if (orphan_removal_counter <0 || orphan_removal_counter >= 2) {
00437 logging_info("Discovered nodes: ");
00438 BOOST_FOREACH( NodeID& id, discovery )
00439 logging_info("* " << id.toString());
00440 discovery.clear();
00441 logging_info("Running orphan removal");
00442 orphan_removal_counter = 0;
00443 for (size_t i = 0; i < table->size(); i++) {
00444 route_item* it = (*table)[i];
00445 if (it->ref_count == 0 && !it->info.isUnspecified()) {
00446 logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString());
00447 table->insert(it->id);
00448 if (it->ref_count==0) {
00449 LinkID id = it->info;
00450 it->info = LinkID::UNSPECIFIED;
00451 baseoverlay.dropLink(id);
00452 }
00453 }
00454 }
00455 }
00456 }
00457 }
00458
00459 void Chord::showLinks() {
00460 logging_info("--- chord routing information ----------------------------------");
00461 logging_info("predecessor: " << (table->get_predesessor()==NULL? "<none>" :
00462 table->get_predesessor()->toString()) );
00463 logging_info("node_id : " << nodeid.toString() );
00464 logging_info("successor : " << (table->get_successor()==NULL? "<none>" :
00465 table->get_successor()->toString()));
00466 logging_info("----------------------------------------------------------------");
00467 }
00468
00469 }}