// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "ariba/overlay/BaseOverlay.h" #include "Chord.h" #include "messages/ChordMessage.h" #include "messages/Discovery.h" #include "detail/chord_routing_table.hpp" namespace ariba { namespace overlay { typedef chord_routing_table::item route_item; use_logging_cpp( Chord ) ; Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, OverlayStructureEvents* _eventsReceiver) : OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver) { // create routing table this->table = new chord_routing_table(_nodeid, 2); orphan_removal_counter = 0; stabilize_counter = 0; stabilize_finger = 0; bootstrapLink = LinkID::UNSPECIFIED; } Chord::~Chord() { // delete routing table delete table; } /// helper: sets up a link using the base overlay LinkID Chord::setup(const EndpointDescriptor& endp) { logging_debug("request to setup link to " << endp.toString() ); // establish link via base overlay return baseoverlay.establishLink(endp, OverlayInterface::OVERLAY_SERVICE_ID); } /// helper: sends a message using the "base overlay" seqnum_t Chord::send(Message* msg, const LinkID& link) { if (link.isUnspecified()) return 0; return baseoverlay.sendMessage(msg, link); } /// sends a discovery message void Chord::send_discovery_to(const NodeID& destination, int ttl) { logging_debug("Initiating discovery of " << destination.toString() ); Message msg; ChordMessage cmsg(ChordMessage::discovery, nodeid, destination); Discovery dmsg; dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor()); dmsg.setFollowType(Discovery::normal); dmsg.setTTL((uint8_t) ttl); cmsg.encapsulate(&dmsg); msg.encapsulate(&cmsg); logging_debug("" << (int)cmsg.getType()); this->onMessage(&msg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); } void Chord::createOverlay() { } void Chord::deleteOverlay() { } void Chord::joinOverlay(const EndpointDescriptor& boot) { logging_info( "joining Chord overlay structure through end-point " << (boot == EndpointDescriptor::UNSPECIFIED ? "local" : boot.toString()) ); // initiator? no->setup first link if (!(boot == EndpointDescriptor::UNSPECIFIED)) bootstrapLink = setup(boot); // timer for stabilization management Timer::setInterval(2500); Timer::start(); } void Chord::leaveOverlay() { Timer::stop(); for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; ChordMessage msg(ChordMessage::leave, nodeid, it->id); send(&msg,it->info); } } const EndpointDescriptor& Chord::resolveNode(const NodeID& node) { const route_item* item = table->get(node); if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED; return baseoverlay.getEndpointDescriptor(item->info); } void Chord::routeMessage(const NodeID& destnode, Message* msg) { // get next hop const route_item* item = table->get_next_hop(destnode); // message for this node? yes-> delegate to base overlay if (item->id == nodeid) baseoverlay.incomingRouteMessage(msg); else { // no-> send to next hop ChordMessage cmsg(ChordMessage::route, nodeid, destnode); cmsg.encapsulate(msg); send(&cmsg, item->info); } } OverlayInterface::NodeList Chord::getKnownNodes() const { OverlayInterface::NodeList nodelist; for (size_t i = 0; i < table->size(); i++) if ((*table)[i]->ref_count != 0) nodelist.push_back((*table)[i]->id); return nodelist; } /// @see CommunicationListener.h /// @see OverlayInterface.h void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) { logging_debug("link_up: link=" << lnk.toString() << " remote=" << remote.toString() ); route_item* item = table->insert(remote); if (!bootstrapLink.isUnspecified() && lnk == bootstrapLink) { send_discovery_to(nodeid); bootstrapLink = LinkID::UNSPECIFIED; } // item added to routing table? if (item != NULL) { // yes-> add to routing table logging_debug("new routing neighbor: " << remote.toString() ); item->info = lnk; } else { // no-> add orphan entry to routing table logging_debug("new orphan: " << remote.toString() ); table->insert_orphan(remote)->info = lnk; } } /// @see CommunicationListener.h or @see OverlayInterface.h void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { logging_debug("link_down: link=" << lnk.toString() << " remote=" << remote.toString() ); // remove link from routing table table->remove(remote); } /// @see CommunicationListener.h /// @see OverlayInterface.h void Chord::onMessage(const DataMessage& msg, const NodeID& remote, const LinkID& link) { // decode message typedef ChordMessage M; M* m = msg.getMessage()->convert (); if (m == NULL) return; // handle messages switch (m->getType()) { // invalid message case M::invalid: break; // route message with payload case M::route: { // find next hop const route_item* item = table->get_next_hop(m->getDestination()); // next hop == myself? if (m->getDestination() == nodeid) { // yes-> route to base overlay logging_debug("send message to baseoverlay"); baseoverlay.incomingRouteMessage(m); } // no-> route to next hop else { logging_debug("route chord message to " << item->id.toString() ); send(m, item->info); } break; } // discovery request case M::discovery: { // decapsulate message Discovery* dmsg = m->decapsulate (); logging_debug("received discovery message with" << " dest=" << m->getDestination().toString() << " ttl=" << (int)dmsg->getTTL() << " type=" << (int)dmsg->getFollowType() ); // check if source node can be added to routing table and setup link if (m->getSource() != nodeid && table->is_insertable(m->getSource())) setup( *dmsg->getSourceEndpoint()); // delegate discovery message switch (dmsg->getFollowType()) { // normal: route discovery message like every other message case Discovery::normal: // closest node? yes-> split to follow successor and predecessor if (table->is_closest_to(m->getDestination())) { if (table->get_successor() != NULL) { // send successor message ChordMessage cmsg_s(*m); Discovery dmsg_s(*dmsg); dmsg_s.setFollowType(Discovery::successor); cmsg_s.encapsulate(&dmsg_s); route_item* succ_item = table->get(*table->get_successor()); logging_debug("split: routing discovery message to successor " << succ_item->id.toString() ); send(&cmsg_s, succ_item->info); } // send predecessor message if (table->get_predesessor() != NULL) { ChordMessage cmsg_p(*m); Discovery dmsg_p(*dmsg); dmsg_p.setFollowType(Discovery::predecessor); cmsg_p.encapsulate(&dmsg_p); route_item* pred_item = table->get( *table->get_predesessor()); logging_debug("split: routing discovery message to predecessor " << pred_item->id.toString() ); send(&cmsg_p, pred_item->info); } } // no-> route message else { // find next hop const route_item* item = table->get_next_hop( m->getDestination()); if (item->id == nodeid) break; logging_debug("routing discovery message to " << item->id.toString() ); send(m, item->info); } break; // successor mode: follow the successor until TTL is zero case Discovery::successor: case Discovery::predecessor: { // time to live ended? yes-> stop routing if (dmsg->getTTL() == 0) break; // decrease time-to-live dmsg->setTTL(dmsg->getTTL() - 1); const route_item* item; if (dmsg->getFollowType() == Discovery::successor) item = table->get(*table->get_successor()); else item = table->get(*table->get_predesessor()); logging_debug("routing discovery message to succ/pred " << item->id.toString() ); ChordMessage cmsg(*m); cmsg.encapsulate(dmsg); send(&cmsg, item->info); break; } } break; } // leave case M::leave: { baseoverlay.dropLink(link); break; } } } void Chord::eventFunction() { stabilize_counter++; if (stabilize_counter == 3) { stabilize_counter = 0; stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() ); logging_debug("sending discovery message to my neighbors"); send_discovery_to(nodeid); send_discovery_to( table->get_finger_table(stabilize_finger).get_compare().get_center() ); orphan_removal_counter++; if (orphan_removal_counter == 2) { orphan_removal_counter = 0; for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; if (it->ref_count == 0) { baseoverlay.dropLink(it->info); logging_debug("dropping orphaned link " << it->info.toString() << " to " << it->id.toString()); } } } } logging_debug("--- chord routing information ----------------------------------"); logging_debug("predecessor: " << (table->get_predesessor()==NULL? "" : table->get_predesessor()->toString()) ); logging_debug("node_id : " << nodeid.toString() ); logging_debug("successor : " << (table->get_successor()==NULL? "" : table->get_successor()->toString())); logging_debug("----------------------------------------------------------------"); } } } // namespace ariba, overlay