00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "ariba/overlay/BaseOverlay.h"
00040
00041 #include "Chord.h"
00042 #include "messages/ChordMessage.h"
00043 #include "messages/Discovery.h"
00044
00045 #include "detail/chord_routing_table.hpp"
00046
00047 namespace ariba {
00048 namespace overlay {
00049
00050 typedef chord_routing_table::item route_item;
00051
00052 use_logging_cpp( Chord );
00053
00054 Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid,
00055 OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) :
00056 OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) {
00057
00058
00059 this->table = new chord_routing_table(_nodeid, 2);
00060 orphan_removal_counter = 0;
00061 stabilize_counter = 0;
00062 stabilize_finger = 0;
00063 bootstrapLink = LinkID::UNSPECIFIED;
00064 }
00065
00066 Chord::~Chord() {
00067
00068
00069 delete table;
00070 }
00071
00073 LinkID Chord::setup(const EndpointDescriptor& endp) {
00074
00075 logging_debug("request to setup link to " << endp.toString() );
00076
00077 return baseoverlay.establishLink(endp, OverlayInterface::OVERLAY_SERVICE_ID);
00078 }
00079
00081 seqnum_t Chord::send(Message* msg, const LinkID& link) {
00082 if (link.isUnspecified()) return 0;
00083 return baseoverlay.sendMessage(msg, link);
00084 }
00085
00087 void Chord::send_discovery_to(const NodeID& destination, int ttl) {
00088 logging_debug("Initiating discovery of " << destination.toString() );
00089 Message msg;
00090 ChordMessage cmsg(ChordMessage::discovery, nodeid, destination);
00091 Discovery dmsg;
00092 dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor());
00093 dmsg.setFollowType(Discovery::normal);
00094 dmsg.setTTL((uint8_t) ttl);
00095 cmsg.encapsulate(&dmsg);
00096 msg.encapsulate(&cmsg);
00097 this->onMessage(&msg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
00098 }
00099
00100 void Chord::createOverlay() {
00101 }
00102
00103 void Chord::deleteOverlay() {
00104
00105 }
00106
00107 void Chord::joinOverlay(const EndpointDescriptor& boot) {
00108 logging_info( "joining Chord overlay structure through end-point " <<
00109 (boot == EndpointDescriptor::UNSPECIFIED ?
00110 "local" : boot.toString()) );
00111
00112
00113 if (!(boot == EndpointDescriptor::UNSPECIFIED)) bootstrapLink = setup(boot);
00114
00115
00116 Timer::setInterval(2500);
00117 Timer::start();
00118 }
00119
00120 void Chord::leaveOverlay() {
00121 Timer::stop();
00122 for (size_t i = 0; i < table->size(); i++) {
00123 route_item* it = (*table)[i];
00124 ChordMessage msg(ChordMessage::leave, nodeid, it->id);
00125 send(&msg,it->info);
00126 }
00127 }
00128
00129 const EndpointDescriptor& Chord::resolveNode(const NodeID& node) {
00130 const route_item* item = table->get(node);
00131 if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED;
00132 return baseoverlay.getEndpointDescriptor(item->info);
00133 }
00134
00135 void Chord::routeMessage(const NodeID& destnode, Message* msg) {
00136
00137 const route_item* item = table->get_next_hop(destnode);
00138
00139
00140 if (item->id == nodeid) baseoverlay.incomingRouteMessage(msg);
00141 else {
00142 ChordMessage cmsg(ChordMessage::route, nodeid, destnode);
00143 cmsg.encapsulate(msg);
00144 send(&cmsg, item->info);
00145 }
00146 }
00147
00148 OverlayInterface::NodeList Chord::getKnownNodes() const {
00149 OverlayInterface::NodeList nodelist;
00150 for (size_t i = 0; i < table->size(); i++)
00151 if ((*table)[i]->ref_count != 0
00152 && !(*table)[i]->info.isUnspecified())
00153 nodelist.push_back((*table)[i]->id);
00154 return nodelist;
00155 }
00156
00159 void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) {
00160 logging_debug("link_up: link=" << lnk.toString() << " remote=" <<
00161 remote.toString() );
00162 route_item* item = table->insert(remote);
00163
00164
00165 if (item != NULL) {
00166 logging_info("new routing neighbor: " << remote.toString()
00167 << " with link " << lnk.toString());
00168 item->info = lnk;
00169 } else {
00170 logging_info("new orphan: " << remote.toString()
00171 << " with link " << lnk.toString());
00172 table->insert_orphan(remote)->info = lnk;
00173 }
00174
00175 if (!bootstrapLink.isUnspecified() && lnk == bootstrapLink) {
00176 send_discovery_to(nodeid);
00177 bootstrapLink = LinkID::UNSPECIFIED;
00178 }
00179 }
00180
00182 void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) {
00183 logging_debug("link_down: link=" << lnk.toString() << " remote=" <<
00184 remote.toString() );
00185
00186
00187 route_item* item = table->get(remote);
00188 if (item!=NULL) item->info = LinkID::UNSPECIFIED;
00189 table->remove(remote);
00190 }
00191
00194 void Chord::onMessage(const DataMessage& msg, const NodeID& remote,
00195 const LinkID& link) {
00196
00197
00198 typedef ChordMessage M;
00199 M* m = msg.getMessage()->convert<ChordMessage> ();
00200 if (m == NULL) return;
00201
00202
00203 switch (m->getType()) {
00204
00205
00206 case M::invalid:
00207 break;
00208
00209
00210 case M::route: {
00211
00212 const route_item* item = table->get_next_hop(m->getDestination());
00213
00214
00215 if (m->getDestination() == nodeid) {
00216 logging_debug("send message to baseoverlay");
00217 baseoverlay.incomingRouteMessage(m);
00218 }
00219
00220 else {
00221 logging_debug("route chord message to " << item->id.toString() );
00222 send(m, item->info);
00223 }
00224 break;
00225 }
00226
00227
00228 case M::discovery: {
00229
00230 Discovery* dmsg = m->decapsulate<Discovery> ();
00231 logging_debug("received discovery message with"
00232 << " dest=" << m->getDestination().toString()
00233 << " ttl=" << (int)dmsg->getTTL()
00234 << " type=" << (int)dmsg->getFollowType()
00235 );
00236
00237
00238 if (m->getSource() != nodeid && table->is_insertable(m->getSource())) setup(
00239 *dmsg->getSourceEndpoint());
00240
00241
00242 switch (dmsg->getFollowType()) {
00243
00244
00245 case Discovery::normal:
00246
00247 if (table->is_closest_to(m->getDestination())) {
00248
00249 if (table->get_successor() != NULL) {
00250
00251 ChordMessage cmsg_s(*m);
00252 Discovery dmsg_s(*dmsg);
00253 dmsg_s.setFollowType(Discovery::successor);
00254 cmsg_s.encapsulate(&dmsg_s);
00255 route_item* succ_item = table->get(*table->get_successor());
00256 logging_debug("split: routing discovery message to successor "
00257 << succ_item->id.toString() );
00258 send(&cmsg_s, succ_item->info);
00259 }
00260
00261
00262 if (table->get_predesessor() != NULL) {
00263 ChordMessage cmsg_p(*m);
00264 Discovery dmsg_p(*dmsg);
00265 dmsg_p.setFollowType(Discovery::predecessor);
00266 cmsg_p.encapsulate(&dmsg_p);
00267 route_item* pred_item = table->get(
00268 *table->get_predesessor());
00269 logging_debug("split: routing discovery message to predecessor "
00270 << pred_item->id.toString() );
00271 send(&cmsg_p, pred_item->info);
00272 }
00273 }
00274
00275 else {
00276
00277 const route_item* item = table->get_next_hop(
00278 m->getDestination());
00279 if (item->id == nodeid) break;
00280 logging_debug("routing discovery message to " <<
00281 item->id.toString() );
00282 send(m, item->info);
00283 }
00284 break;
00285
00286
00287 case Discovery::successor:
00288 case Discovery::predecessor: {
00289
00290 if (dmsg->getTTL() == 0) break;
00291
00292
00293 dmsg->setTTL(dmsg->getTTL() - 1);
00294
00295 const route_item* item = NULL;
00296 if (dmsg->getFollowType() == Discovery::successor &&
00297 table->get_successor() != NULL) {
00298 item = table->get(*table->get_successor());
00299 } else {
00300 if (table->get_predesessor()!=NULL)
00301 item = table->get(*table->get_predesessor());
00302 }
00303 if (item == NULL) break;
00304 logging_debug("routing discovery message to succ/pred "
00305 << item->id.toString() );
00306 ChordMessage cmsg(*m);
00307 cmsg.encapsulate(dmsg);
00308 send(&cmsg, item->info);
00309 break;
00310 }
00311 }
00312 break;
00313 }
00314
00315
00316 case M::leave: {
00317 if (link!=LinkID::UNSPECIFIED) {
00318 route_item* item = table->get(remote);
00319 if (item!=NULL) item->info = LinkID::UNSPECIFIED;
00320 table->remove(remote);
00321 baseoverlay.dropLink(link);
00322 }
00323 break;
00324 }
00325 }
00326 }
00327
00328 void Chord::eventFunction() {
00329 if (!LinkID::UNSPECIFIED.isUnspecified())
00330 logging_error("LinkID::UNSPECIFIED not unspecified!!!!");
00331 stabilize_counter++;
00332 if (stabilize_counter == 3) {
00333 size_t numNeighbors = 0;
00334 for (size_t i = 0; i < table->size(); i++) {
00335 route_item* it = (*table)[i];
00336 if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++;
00337 }
00338 logging_info("Running stabilization: #links="
00339 << table->size() << " #neighbors=" << numNeighbors );
00340 stabilize_counter = 0;
00341 stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() );
00342 logging_debug("Sending discovery message to my neighbors and fingers");
00343 const NodeID& disc1 = nodeid;
00344 const NodeID& disc2 = table->get_finger_table(stabilize_finger).get_compare().get_center();
00345 send_discovery_to(disc1);
00346 if (disc1 != disc2) send_discovery_to(disc2);
00347 orphan_removal_counter++;
00348 if (orphan_removal_counter == 2) {
00349 logging_info("Running orphan removal");
00350 orphan_removal_counter = 0;
00351 for (size_t i = 0; i < table->size(); i++) {
00352 route_item* it = (*table)[i];
00353 if (it->ref_count == 0 && !it->info.isUnspecified()) {
00354 logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString());
00355 baseoverlay.dropLink(it->info);
00356 it->info = LinkID::UNSPECIFIED;
00357 }
00358 }
00359 }
00360 }
00361 logging_debug("--- chord routing information ----------------------------------");
00362 logging_debug("predecessor: " << (table->get_predesessor()==NULL? "<none>" :
00363 table->get_predesessor()->toString()) );
00364 logging_debug("node_id : " << nodeid.toString() );
00365 logging_debug("successor : " << (table->get_successor()==NULL? "<none>" :
00366 table->get_successor()->toString()));
00367 logging_debug("----------------------------------------------------------------");
00368 }
00369
00370 }
00371 }