// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "ariba/overlay/BaseOverlay.h" #include "Chord.h" #include "messages/ChordMessage.h" #include "messages/Discovery.h" #include "detail/chord_routing_table.hpp" namespace ariba { namespace overlay { typedef chord_routing_table::item route_item; use_logging_cpp( Chord ); Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) : OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) { // create routing table this->table = new chord_routing_table(_nodeid, 2); orphan_removal_counter = 0; stabilize_counter = 0; stabilize_finger = 0; } Chord::~Chord() { // delete routing table delete table; } /// helper: sets up a link using the base overlay LinkID Chord::setup(const EndpointDescriptor& endp, const NodeID& node) { logging_debug("request to setup link to " << endp.toString() ); for (size_t i=0; ionMessage(&msg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); } void Chord::createOverlay() { } void Chord::deleteOverlay() { } void Chord::joinOverlay(const EndpointDescriptor& boot) { logging_info( "joining Chord overlay structure through end-point " << (boot.isUnspecified() ? "local" : boot.toString()) ); // initiator? no->setup first link if (!boot.isUnspecified()) bootstrapLinks.push_back( setup(boot) ); // timer for stabilization management Timer::setInterval(1000); Timer::start(); } void Chord::leaveOverlay() { Timer::stop(); for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; ChordMessage msg(ChordMessage::leave, nodeid, it->id); send(&msg,it->info); } } const EndpointDescriptor& Chord::resolveNode(const NodeID& node) { const route_item* item = table->get(node); if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); return baseoverlay.getEndpointDescriptor(item->info); } void Chord::routeMessage(const NodeID& destnode, Message* msg) { // get next hop const route_item* item = table->get_next_hop(destnode); // message for this node? yes-> delegate to base overlay if (item->id == nodeid || destnode == nodeid) baseoverlay.incomingRouteMessage( msg, LinkID::UNSPECIFIED, nodeid ); else { // no-> send to next hop ChordMessage cmsg(ChordMessage::route, nodeid, destnode); cmsg.encapsulate(msg); send(&cmsg, item->info); } } /// @see OverlayInterface.h void Chord::routeMessage(const NodeID& node, const LinkID& link, Message* msg) { logging_debug("Redirect over Chord to node id=" << node.toString() << " link id=" << link.toString() ); ChordMessage cmsg(ChordMessage::route, nodeid, node); cmsg.encapsulate(msg); send(&cmsg, link); } /// @see OverlayInterface.h const LinkID& Chord::getNextLinkId( const NodeID& id ) const { // get next hop const route_item* item = table->get_next_hop(id); // returns a unspecified id when this is itself if (item == NULL || item->id == nodeid) return LinkID::UNSPECIFIED; /// return routing info return item->info; } OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const { OverlayInterface::NodeList nodelist; if( deep ){ // all nodes that I know, fingers, succ/pred for (size_t i = 0; i < table->size(); i++){ if ((*table)[i]->ref_count != 0 && !(*table)[i]->info.isUnspecified()) nodelist.push_back((*table)[i]->id); } } else { // only succ and pred if( table->get_predesessor() != NULL ){ nodelist.push_back( *(table->get_predesessor()) ); } if( table->get_successor() != NULL ){ OverlayInterface::NodeList::iterator i = std::find( nodelist.begin(), nodelist.end(), *(table->get_successor()) ); if( i == nodelist.end() ) nodelist.push_back( *(table->get_successor()) ); } } return nodelist; } /// @see CommunicationListener.h /// @see OverlayInterface.h void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) { logging_debug("link_up: link=" << lnk.toString() << " remote=" << remote.toString() ); for (vector::iterator i=pending.begin(); i!=pending.end(); i++) if (*i == remote) { pending.erase(i); break; } route_item* item = table->insert(remote); // item added to routing table? if (item != NULL) { // yes-> add to routing table logging_info("new routing neighbor: " << remote.toString() << " with link " << lnk.toString()); item->info = lnk; } else { // no-> add orphan entry to routing table logging_info("new orphan: " << remote.toString() << " with link " << lnk.toString()); table->insert_orphan(remote)->info = lnk; } vector::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk); if( it != bootstrapLinks.end() ) { // send discovery over bootstrap Message msg; ChordMessage cmsg(ChordMessage::discovery, nodeid, nodeid); Discovery dmsg; dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor()); dmsg.setFollowType(Discovery::normal); dmsg.setTTL((uint8_t) 4); cmsg.encapsulate(&dmsg); msg.encapsulate(&cmsg); send(&msg, lnk); bootstrapLinks.erase( it ); } } /// @see CommunicationListener.h or @see OverlayInterface.h void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { logging_debug("link_down: link=" << lnk.toString() << " remote=" << remote.toString() ); // remove link from routing table route_item* item = table->get(remote); if (item!=NULL) item->info = LinkID::UNSPECIFIED; table->remove(remote); } /// @see CommunicationListener.h /// @see OverlayInterface.h void Chord::onMessage(const DataMessage& msg, const NodeID& remote, const LinkID& link) { // decode message typedef ChordMessage M; M* m = msg.getMessage()->convert (); if (m == NULL) return; // handle messages switch (m->getType()) { // invalid message case M::invalid: break; // route message with payload case M::route: { // find next hop const route_item* item = table->get_next_hop(m->getDestination()); // next hop == myself? if (m->getDestination() == nodeid) { // yes-> route to base overlay logging_debug("Send message to baseoverlay"); baseoverlay.incomingRouteMessage( m, item->info, remote ); } // no-> route to next hop else { logging_debug("Route chord message to " << item->id.toString() << " (destination=" << m->getDestination() << ")"); send(m, item->info); } break; } // discovery request case M::discovery: { // decapsulate message Discovery* dmsg = m->decapsulate (); logging_debug("received discovery message with" << " dest=" << m->getDestination().toString() << " ttl=" << (int)dmsg->getTTL() << " type=" << (int)dmsg->getFollowType() ); // check if source node can be added to routing table and setup link if (m->getSource() != nodeid && table->is_insertable(m->getSource())) setup(*dmsg->getSourceEndpoint(), m->getSource() ); // delegate discovery message switch (dmsg->getFollowType()) { // normal: route discovery message like every other message case Discovery::normal: // closest node? yes-> split to follow successor and predecessor if (table->is_closest_to(m->getDestination())) { if (table->get_successor() != NULL) { // send successor message ChordMessage cmsg_s(*m); Discovery dmsg_s(*dmsg); dmsg_s.setFollowType(Discovery::successor); cmsg_s.encapsulate(&dmsg_s); route_item* succ_item = table->get(*table->get_successor()); logging_debug("split: routing discovery message to successor " << succ_item->id.toString() ); send(&cmsg_s, succ_item->info); } // send predecessor message if (table->get_predesessor() != NULL) { ChordMessage cmsg_p(*m); Discovery dmsg_p(*dmsg); dmsg_p.setFollowType(Discovery::predecessor); cmsg_p.encapsulate(&dmsg_p); route_item* pred_item = table->get( *table->get_predesessor()); logging_debug("split: routing discovery message to predecessor " << pred_item->id.toString() ); send(&cmsg_p, pred_item->info); } } // no-> route message else { // find next hop const route_item* item = table->get_next_hop( m->getDestination()); if (item->id == nodeid) break; logging_debug("routing discovery message to " << item->id.toString() ); ChordMessage cmsg_p(*m); Discovery dmsg_p(*dmsg); cmsg_p.encapsulate(&dmsg_p); send(&cmsg_p, item->info); } break; // successor mode: follow the successor until TTL is zero case Discovery::successor: case Discovery::predecessor: { // time to live ended? yes-> stop routing if (dmsg->getTTL() == 0) break; // decrease time-to-live dmsg->setTTL(dmsg->getTTL() - 1); const route_item* item = NULL; if (dmsg->getFollowType() == Discovery::successor && table->get_successor() != NULL) { item = table->get(*table->get_successor()); } else { if (table->get_predesessor()!=NULL) item = table->get(*table->get_predesessor()); } if (item == NULL) break; logging_debug("routing discovery message to succ/pred " << item->id.toString() ); ChordMessage cmsg(*m); cmsg.encapsulate(dmsg); send(&cmsg, item->info); break; } } break; } // leave case M::leave: { if (link!=LinkID::UNSPECIFIED) { route_item* item = table->get(remote); if (item!=NULL) item->info = LinkID::UNSPECIFIED; table->remove(remote); baseoverlay.dropLink(link); } break; } } } void Chord::eventFunction() { stabilize_counter++; if (stabilize_counter == 3) { pending.clear(); size_t numNeighbors = 0; for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++; } logging_info("Running stabilization: #links=" << table->size() << " #neighbors=" << numNeighbors ); stabilize_counter = 0; stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() ); logging_debug("Sending discovery message to my neighbors and fingers"); const NodeID& disc1 = nodeid; const NodeID& disc2 = table->get_finger_table(stabilize_finger).get_compare().get_center(); send_discovery_to(disc1); if (disc1 != disc2) send_discovery_to(disc2); orphan_removal_counter++; if (orphan_removal_counter >= 2) { logging_info("Running orphan removal"); orphan_removal_counter = 0; for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; if (it->ref_count == 0 && !it->info.isUnspecified()) { logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString()); baseoverlay.dropLink(it->info); it->info = LinkID::UNSPECIFIED; } } } } logging_debug("--- chord routing information ----------------------------------"); logging_debug("predecessor: " << (table->get_predesessor()==NULL? "" : table->get_predesessor()->toString()) ); logging_debug("node_id : " << nodeid.toString() ); logging_debug("successor : " << (table->get_successor()==NULL? "" : table->get_successor()->toString())); logging_debug("----------------------------------------------------------------"); } }} // namespace ariba, overlay