// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "ariba/overlay/BaseOverlay.h" #include "ariba/overlay/messages/OverlayMsg.h" #include "Chord.h" #include "detail/chord_routing_table.hpp" //#include "messages/Discovery.h" // XXX DEPRECATED namespace ariba { namespace overlay { enum signalMessageTypes { typeDiscovery = OverlayMsg::typeSignalingStart + 0x01, typeLeave = OverlayMsg::typeSignalingStart + 0x02, }; typedef chord_routing_table::item route_item; using ariba::transport::system_priority; use_logging_cpp( Chord ); ////// Messages struct DiscoveryMessage { /** * DiscoveryMessage * - type * - data * - Endpoint */ // type enum enum type_ { invalid = 0, normal = 1, successor = 2, predecessor = 3 }; // data uint8_t type; uint8_t ttl; EndpointDescriptor endpoint; // serialize reboost::message_t serialize() { // serialize endpoint reboost::message_t msg = endpoint.serialize(); // serialize type and ttl uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data(); buff1[0] = type; buff1[1] = ttl; return msg; } //deserialize reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff) { // deserialize type and ttl const uint8_t* bytes = buff.data(); type = bytes[0]; ttl = bytes[1]; // deserialize endpoint return endpoint.deserialize(buff(2*sizeof(uint8_t))); } }; Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) : OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) { // create routing table this->table = new chord_routing_table(_nodeid, 4); orphan_removal_counter = 0; discovery_count = 0; stabilize_counter = 0; stabilize_finger = 0; } Chord::~Chord() { // delete routing table delete table; } /// helper: sets up a link using the base overlay LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) { // check if we already have a connection for (size_t i=0; isize(); i++) if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified())) return LinkID::UNSPECIFIED; // check if we are already trying to establish a link for (size_t i=0; isize() == 0 ) return; ttl = 2; OverlayMsg msg( typeDiscovery ); msg.setRegisterRelay(true); // create DiscoveryMessage DiscoveryMessage dmsg; dmsg.type = DiscoveryMessage::normal; dmsg.ttl = ttl; dmsg.endpoint = baseoverlay.getEndpointDescriptor(); msg.set_payload_message(dmsg.serialize()); // send to node try { baseoverlay.send_node( &msg, remote, system_priority::OVERLAY ); } catch ( message_not_sent& e ) { logging_warn("Chord: Could not send message to " << remote << ": " << e.what()); } } void Chord::discover_neighbors( const LinkID& link ) { uint8_t ttl = 1; // FIXME try-catch for the send operations // create DiscoveryMessage DiscoveryMessage dmsg; dmsg.ttl = ttl; dmsg.endpoint = baseoverlay.getEndpointDescriptor(); { // send predecessor discovery OverlayMsg msg( typeDiscovery ); msg.setRegisterRelay(true); // set type dmsg.type = DiscoveryMessage::predecessor; // send msg.set_payload_message(dmsg.serialize()); send(&msg, link); } { // send successor discovery OverlayMsg msg( typeDiscovery ); // msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); // XXX this was redundand, wasn't it? msg.setRegisterRelay(true); // set type dmsg.type = DiscoveryMessage::successor; // send msg.set_payload_message(dmsg.serialize()); send(&msg, link); } } void Chord::createOverlay() { } void Chord::deleteOverlay() { } void Chord::joinOverlay(const EndpointDescriptor& boot) { logging_info( "joining Chord overlay structure through end-point " << (boot.isUnspecified() ? "local" : boot.toString()) ); // initiator? no->setup first link if (!boot.isUnspecified()) bootstrapLinks.push_back( setup(boot) ); // timer for stabilization management // Timer::setInterval(1000); // TODO find an appropriate interval! Timer::setInterval(10000); // XXX testing... Timer::start(); } void Chord::leaveOverlay() { Timer::stop(); for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; OverlayMsg msg( typeLeave ); send( &msg, it->info ); } } /// @see OverlayInterface.h const EndpointDescriptor& Chord::resolveNode(const NodeID& node) { const route_item* item = table->get(node); if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); return baseoverlay.getEndpointDescriptor(item->info); } /// @see OverlayInterface.h bool Chord::isClosestNodeTo( const NodeID& node ) { return table->is_closest_to(node); } /// @see OverlayInterface.h const LinkID& Chord::getNextLinkId( const NodeID& id ) const { // get next hop const route_item* item = table->get_next_hop(id); // returns a unspecified id when this is itself if (item == NULL || item->id == nodeid) return LinkID::UNSPECIFIED; /// return routing info return item->info; } std::vector Chord::getSortedLinkIdsTowardsNode( const NodeID& id, int num ) const { std::vector ret; switch ( num ) { // special case: just call »getNextLinkId« case 1: { ret.push_back(&getNextLinkId(id)); break; } // * calculate top 2 * case 0: case 2: { std::vector items = table->get_next_2_hops(id); ret.reserve(items.size()); BOOST_FOREACH( const route_item* item, items ) { ret.push_back(&item->info); } break; } // NOTE: implement real sorting, if needed (and handle "case 0" properly, then) default: { throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)"); break; } } return ret; } /// @see OverlayInterface.h const NodeID& Chord::getNextNodeId( const NodeID& id ) const { // get next hop const route_item* item = table->get_next_hop(id); // return unspecified if no next hop could be found if (item == NULL) { return NodeID::UNSPECIFIED; } return item->id; } OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const { OverlayInterface::NodeList nodelist; if( deep ){ // all nodes that I know, fingers, succ/pred for (size_t i = 0; i < table->size(); i++){ if ((*table)[i]->ref_count != 0 && !(*table)[i]->info.isUnspecified()) nodelist.push_back((*table)[i]->id); } } else { // only succ and pred if( table->get_predesessor() != NULL ){ nodelist.push_back( *(table->get_predesessor()) ); } if( table->get_successor() != NULL ){ OverlayInterface::NodeList::iterator i = std::find( nodelist.begin(), nodelist.end(), *(table->get_successor()) ); if( i == nodelist.end() ) nodelist.push_back( *(table->get_successor()) ); } } return nodelist; } /// @see CommunicationListener.h /// @see OverlayInterface.h void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) { logging_info("link_up: link=" << lnk.toString() << " remote=" << remote.toString() ); for (vector::iterator i=pending.begin(); i!=pending.end(); i++) if (*i == remote) { pending.erase(i); break; } if (remote==nodeid) { logging_warn("dropping link that has been established to myself (nodes have same nodeid?)"); logging_warn("NodeID: " << remote); baseoverlay.dropLink(lnk); return; } route_item* item = table->insert(remote); // item added to routing table? if (item != NULL) { // yes-> add to routing table logging_info("new routing neighbor: " << remote.toString() << " with link " << lnk.toString()); // replace with new link if link is "better" if (item->info!=lnk && item->info.isUnspecified()==false) { if (baseoverlay.compare( item->info, lnk ) == 1) { logging_info("Replacing link due to concurrent link establishment."); baseoverlay.dropLink(item->info); item->info = lnk; } } else { item->info = lnk; } // discover neighbors of new overlay neighbor showLinks(); } else { // no-> add orphan entry to routing table logging_info("new orphan: " << remote.toString() << " with link " << lnk.toString()); table->insert_orphan(remote)->info = lnk; } // erase bootstrap link vector::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk); if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); } /// @see CommunicationListener.h or @see OverlayInterface.h void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { // XXX logging_debug logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" << remote.toString() ); // remove link from routing table route_item* item = table->get(remote); if (item!=NULL && item->info==lnk) { item->info = LinkID::UNSPECIFIED; table->remove(remote); } } /// @see CommunicationListener.h /// @see OverlayInterface.h void Chord::onMessage(OverlayMsg* msg, reboost::shared_buffer_t sub_msg, const NodeID& remote, const LinkID& link) { // handle messages switch ((signalMessageTypes) msg->getType()) { // discovery request case typeDiscovery: { // deserialize discovery message DiscoveryMessage dmsg; dmsg.deserialize(sub_msg); logging_debug("Received discovery message with" << " src=" << msg->getSourceNode().toString() << " dst=" << msg->getDestinationNode().toString() << " ttl=" << (int)dmsg.ttl << " type=" << (int)dmsg.type ); // add discovery node id bool found = false; BOOST_FOREACH( NodeID& value, discovery ) if (value == msg->getSourceNode()) { found = true; break; } if (!found) discovery.push_back(msg->getSourceNode()); // check if source node can be added to routing table and setup link if (msg->getSourceNode() != nodeid) setup( dmsg.endpoint, msg->getSourceNode() ); // process discovery message -------------------------- switch start -- switch ( dmsg.type ) { // normal: route discovery message like every other message case DiscoveryMessage::normal: { // closest node? yes-> split to follow successor and predecessor if ( table->is_closest_to(msg->getDestinationNode()) ) { logging_debug("Discovery split:"); if (!table->get_successor()->isUnspecified()) { OverlayMsg omsg(*msg); dmsg.type = DiscoveryMessage::successor; omsg.set_payload_message(dmsg.serialize()); logging_debug("* Routing to successor " << table->get_successor()->toString() ); send_node( &omsg, *table->get_successor() ); } // send predecessor message if (!table->get_predesessor()->isUnspecified()) { OverlayMsg omsg(*msg); dmsg.type = DiscoveryMessage::predecessor; omsg.set_payload_message(dmsg.serialize()); logging_debug("* Routing to predecessor " << table->get_predesessor()->toString() ); send_node( &omsg, *table->get_predesessor() ); } } // no-> route message else { baseoverlay.route( msg ); } break; } // successor mode: follow the successor until TTL is zero case DiscoveryMessage::successor: case DiscoveryMessage::predecessor: { // reached destination? no->forward! if (msg->getDestinationNode() != nodeid) { OverlayMsg omsg(*msg); omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); omsg.set_payload_message(dmsg.serialize()); baseoverlay.route( &omsg ); break; } // time to live ended? yes-> stop routing if (dmsg.ttl == 0 || dmsg.ttl > 10) break; // decrease time-to-live dmsg.ttl--; const route_item* item = NULL; if (dmsg.type == DiscoveryMessage::successor && table->get_successor() != NULL) { item = table->get(*table->get_successor()); } else if (table->get_predesessor() != NULL) { item = table->get(*table->get_predesessor()); } if (item == NULL) break; logging_debug("Routing discovery message to succ/pred " << item->id.toString() ); OverlayMsg omsg(*msg); omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); omsg.setDestinationNode(item->id); omsg.set_payload_message(dmsg.serialize()); send_node( &omsg, omsg.getDestinationNode() ); break; } case DiscoveryMessage::invalid: break; default: break; } // process discovery message ---------------------------- switch end -- break; } // leave case typeLeave: { if (link!=LinkID::UNSPECIFIED) { route_item* item = table->get(remote); if (item!=NULL) item->info = LinkID::UNSPECIFIED; table->remove(remote); baseoverlay.dropLink(link); } break; } } } void Chord::eventFunction() { stabilize_counter++; if (stabilize_counter < 0 || stabilize_counter == 2) { // reset counter stabilize_counter = 0; // clear pending connections pending.clear(); // get number of real neighbors size_t numNeighbors = 0; for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++; } logging_info("Running stabilization: #links=" << table->size() << " #neighbors=" << numNeighbors ); // updating neighbors logging_debug("Discover new ring neighbors"); for (size_t i=0; isize(); i++) { LinkID id = (*table)[i]->info; if (!id.isUnspecified()) discover_neighbors(id); } // sending discovery logging_debug("Sending discovery message to my neighbors and fingers"); stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() ); const NodeID disc = table->get_finger_table(stabilize_finger).get_compare().get_center(); if (disc != nodeid) send_discovery_to(disc); // remove orphan links orphan_removal_counter++; if (orphan_removal_counter <0 || orphan_removal_counter >= 2) { logging_info("Discovered nodes: "); BOOST_FOREACH( NodeID& id, discovery ) logging_info("* " << id.toString()); discovery.clear(); logging_info("Running orphan removal"); orphan_removal_counter = 0; for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; if (it->ref_count == 0 && !it->info.isUnspecified()) { logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString()); table->insert(it->id); if (it->ref_count==0) { LinkID id = it->info; it->info = LinkID::UNSPECIFIED; baseoverlay.dropLink(id); } } } } } } void Chord::showLinks() { logging_info("--- chord routing information ----------------------------------"); logging_info("predecessor: " << (table->get_predesessor()==NULL? "" : table->get_predesessor()->toString()) ); logging_info("node_id : " << nodeid.toString() ); logging_info("successor : " << (table->get_successor()==NULL? "" : table->get_successor()->toString())); logging_info("----------------------------------------------------------------"); } /// @see OverlayInterface.h std::string Chord::debugInformation() const { std::ostringstream s; s << "protocol : Chord" << endl; s << "node_id : " << nodeid.toString() << endl; s << "predecessor: " << (table->get_predesessor()==NULL? "" : table->get_predesessor()->toString()) << endl; s << "successor : " << (table->get_successor()==NULL? "" : table->get_successor()->toString()) << endl; s << "nodes: " << endl; for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; if (it->ref_count != 0 && !it->info.isUnspecified()) { s << it->id.toString().substr(0,6) << " using " << it->info.toString().substr(0,6) << endl; } } return s.str(); } }} // namespace ariba, overlay