// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "ariba/overlay/BaseOverlay.h" #include "ariba/overlay/messages/OverlayMsg.h" #include "Chord.h" #include "detail/chord_routing_table.hpp" #include "messages/Discovery.h" namespace ariba { namespace overlay { enum signalMessageTypes { typeDiscovery = OverlayMsg::typeSignalingStart + 0x01, typeLeave = OverlayMsg::typeSignalingStart + 0x02, }; typedef chord_routing_table::item route_item; use_logging_cpp( Chord ); Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) : OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) { // create routing table <<<<<<< .working this->table = new chord_routing_table(_nodeid, 2); ======= this->table = new chord_routing_table(_nodeid, 4); >>>>>>> .merge-rechts.r5869 orphan_removal_counter = 0; discovery_count = 0; stabilize_counter = 0; stabilize_finger = 0; } Chord::~Chord() { // delete routing table delete table; } /// helper: sets up a link using the base overlay LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) { // check if we already have a connection for (int i=0; isize(); i++) if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified())) return LinkID::UNSPECIFIED; // check if we are already trying to establish a link for (size_t i=0; i>>>>>> .merge-rechts.r5869 } /// helper: sends a message using the "base overlay" seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) { if (link.isUnspecified()) return 0; msg->setRelayed(true); return baseoverlay.send_link( msg, link ); } /// sends a discovery message void Chord::send_discovery_to(const NodeID& remote, int ttl) { LinkID link = getNextLinkId(remote); if ( remote == nodeid || link.isUnspecified()) return; if ( table->size() == 0 ) return; OverlayMsg msg( typeDiscovery ); msg.setRelayed(true); Discovery dmsg( Discovery::normal, (uint8_t)2, baseoverlay.getEndpointDescriptor() ); msg.encapsulate(&dmsg); <<<<<<< .working // get next hop const route_item* item = table->get_next_hop(destination); if (item!=NULL && !item->info.isUnspecified()) send(&cmsg, item->info); ======= // send to node baseoverlay.send_node( &msg, remote ); >>>>>>> .merge-rechts.r5869 } void Chord::discover_neighbors( const LinkID& link ) { uint8_t ttl = 2; { // send predecessor discovery OverlayMsg msg( typeDiscovery ); msg.setRelayed(true); Discovery dmsg( Discovery::predecessor, ttl, baseoverlay.getEndpointDescriptor() ); msg.encapsulate(&dmsg); send(&msg, link); } { // send successor discovery OverlayMsg msg( typeDiscovery ); msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); msg.setRelayed(true); Discovery dmsg( Discovery::successor, ttl, baseoverlay.getEndpointDescriptor() ); msg.encapsulate(&dmsg); send(&msg, link); } } void Chord::createOverlay() { } void Chord::deleteOverlay() { } void Chord::joinOverlay(const EndpointDescriptor& boot) { logging_info( "joining Chord overlay structure through end-point " << (boot.isUnspecified() ? "local" : boot.toString()) ); // initiator? no->setup first link if (!boot.isUnspecified()) bootstrapLinks.push_back( setup(boot) ); // timer for stabilization management Timer::setInterval(1000); Timer::start(); } void Chord::leaveOverlay() { Timer::stop(); for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; OverlayMsg msg( typeLeave ); send( &msg, it->info ); } } /// @see OverlayInterface.h const EndpointDescriptor& Chord::resolveNode(const NodeID& node) { const route_item* item = table->get(node); if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); return baseoverlay.getEndpointDescriptor(item->info); } /// @see OverlayInterface.h const LinkID& Chord::getNextLinkId( const NodeID& id ) const { // get next hop <<<<<<< .working const route_item* item = table->get_next_hop(destnode); ======= const route_item* item = table->get_next_hop(id); >>>>>>> .merge-rechts.r5869 <<<<<<< .working // message for this node? yes-> delegate to base overlay if (item->id == nodeid || destnode == nodeid) baseoverlay.incomingRouteMessage( msg, LinkID::UNSPECIFIED, nodeid ); ======= // returns a unspecified id when this is itself if (item == NULL || item->id == nodeid) return LinkID::UNSPECIFIED; >>>>>>> .merge-rechts.r5869 <<<<<<< .working else { // no-> send to next hop ChordMessage cmsg(ChordMessage::route, nodeid, destnode); cmsg.encapsulate(msg); send(&cmsg, item->info); } ======= /// return routing info return item->info; >>>>>>> .merge-rechts.r5869 } <<<<<<< .working /// @see OverlayInterface.h void Chord::routeMessage(const NodeID& node, const LinkID& link, Message* msg) { logging_debug("Redirect over Chord to node id=" << node.toString() << " link id=" << link.toString() ); ChordMessage cmsg(ChordMessage::route, nodeid, node); cmsg.encapsulate(msg); send(&cmsg, link); } /// @see OverlayInterface.h const LinkID& Chord::getNextLinkId( const NodeID& id ) const { // get next hop const route_item* item = table->get_next_hop(id); // returns a unspecified id when this is itself if (item == NULL || item->id == nodeid) return LinkID::UNSPECIFIED; /// return routing info return item->info; } ======= >>>>>>> .merge-rechts.r5869 OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const { OverlayInterface::NodeList nodelist; if( deep ){ // all nodes that I know, fingers, succ/pred for (size_t i = 0; i < table->size(); i++){ if ((*table)[i]->ref_count != 0 && !(*table)[i]->info.isUnspecified()) nodelist.push_back((*table)[i]->id); } } else { // only succ and pred if( table->get_predesessor() != NULL ){ nodelist.push_back( *(table->get_predesessor()) ); } if( table->get_successor() != NULL ){ OverlayInterface::NodeList::iterator i = std::find( nodelist.begin(), nodelist.end(), *(table->get_successor()) ); if( i == nodelist.end() ) nodelist.push_back( *(table->get_successor()) ); } } return nodelist; } /// @see CommunicationListener.h /// @see OverlayInterface.h void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) { logging_info("link_up: link=" << lnk.toString() << " remote=" << remote.toString() ); for (vector::iterator i=pending.begin(); i!=pending.end(); i++) if (*i == remote) { pending.erase(i); break; } /* // check if we already have a connection, yes-> do not handle duplicate! for (int i=0; isize(); i++) if ((*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()) && (*table)[i]->info != lnk) { return; } */ if (remote==nodeid) { baseoverlay.dropLink(lnk); return; } route_item* item = table->insert(remote); // item added to routing table? if (item != NULL) { // yes-> add to routing table logging_info("new routing neighbor: " << remote.toString() << " with link " << lnk.toString()); // replace with new link if (!item->info.isUnspecified() || item->info!=lnk) baseoverlay.dropLink(item->info); item->info = lnk; // discover neighbors of new overlay neighbor discover_neighbors( lnk ); showLinks(); } else { // no-> add orphan entry to routing table logging_info("new orphan: " << remote.toString() << " with link " << lnk.toString()); table->insert_orphan(remote)->info = lnk; } // erase bootstrap link vector::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk); if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); } /// @see CommunicationListener.h or @see OverlayInterface.h void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { logging_debug("link_down: link=" << lnk.toString() << " remote=" << remote.toString() ); // remove link from routing table route_item* item = table->get(remote); if (item!=NULL && item->info==lnk) { item->info = LinkID::UNSPECIFIED; table->remove(remote); } } /// @see CommunicationListener.h /// @see OverlayInterface.h void Chord::onMessage(const DataMessage& msg, const NodeID& remote, const LinkID& link) { // decode message OverlayMsg* m = dynamic_cast(msg.getMessage()); if (m == NULL) return; // handle messages switch (m->getType()) { <<<<<<< .working // invalid message case M::invalid: break; // route message with payload case M::route: { // find next hop const route_item* item = table->get_next_hop(m->getDestination()); // next hop == myself? if (m->getDestination() == nodeid) { // yes-> route to base overlay logging_debug("Send message to baseoverlay"); baseoverlay.incomingRouteMessage( m, item->info, remote ); } // no-> route to next hop else { logging_debug("Route chord message to " << item->id.toString() << " (destination=" << m->getDestination() << ")"); send(m, item->info); } break; } // discovery request case M::discovery: { ======= // discovery request case typeDiscovery: { >>>>>>> .merge-rechts.r5869 // decapsulate message Discovery* dmsg = m->decapsulate (); logging_debug("Received discovery message with" << " src=" << m->getSourceNode().toString() << " dst=" << m->getDestinationNode().toString() << " ttl=" << (int)dmsg->getTTL() << " type=" << (int)dmsg->getType() ); // check if source node can be added to routing table and setup link if (m->getSourceNode() != nodeid && table->is_insertable(m->getSourceNode())) setup( dmsg->getEndpoint(), m->getSourceNode() ); // delegate discovery message switch (dmsg->getType()) { // normal: route discovery message like every other message case Discovery::normal: { // closest node? yes-> split to follow successor and predecessor if ( table->is_closest_to(m->getDestinationNode()) ) { if (table->get_successor() != NULL) { OverlayMsg omsg(*m); dmsg->setType(Discovery::successor); omsg.encapsulate(dmsg); route_item* succ_item = table->get(*table->get_successor()); logging_debug("Discovery split: routing discovery message to successor " << succ_item->id.toString() ); <<<<<<< .working send(&cmsg_s, succ_item->info); ======= send(&omsg, succ_item->info); >>>>>>> .merge-rechts.r5869 } // send predecessor message if (table->get_predesessor() != NULL) { OverlayMsg omsg(*m); dmsg->setType(Discovery::predecessor); omsg.encapsulate(dmsg); route_item* pred_item = table->get( *table->get_predesessor()); logging_debug("Discovery split: routing discovery message to predecessor " << pred_item->id.toString() ); <<<<<<< .working send(&cmsg_p, pred_item->info); ======= send( &omsg, pred_item->info); >>>>>>> .merge-rechts.r5869 } } // no-> route message else { <<<<<<< .working // find next hop const route_item* item = table->get_next_hop(m->getDestination()); if (item == NULL || item->id == nodeid) break; logging_debug("routing discovery message to " << item->id.toString() ); send(m, item->info); ======= baseoverlay.send( m, m->getDestinationNode() ); >>>>>>> .merge-rechts.r5869 } break; } // successor mode: follow the successor until TTL is zero case Discovery::successor: case Discovery::predecessor: { // time to live ended? yes-> stop routing if (dmsg->getTTL() == 0 || dmsg->getTTL() > 10) break; // decrease time-to-live dmsg->setTTL(dmsg->getTTL() - 1); const route_item* item = NULL; if (dmsg->getType() == Discovery::successor && table->get_successor() != NULL) { item = table->get(*table->get_successor()); } else { if (table->get_predesessor()!=NULL) item = table->get(*table->get_predesessor()); } if (item == NULL) break; logging_debug("routing discovery message to succ/pred " << item->id.toString() ); OverlayMsg omsg(*m); omsg.encapsulate(dmsg); omsg.setDestinationNode(item->id); baseoverlay.send(&omsg, omsg.getDestinationNode()); break; }} delete dmsg; break; } // leave case typeLeave: { if (link!=LinkID::UNSPECIFIED) { route_item* item = table->get(remote); if (item!=NULL) item->info = LinkID::UNSPECIFIED; table->remove(remote); baseoverlay.dropLink(link); } break; }} } void Chord::eventFunction() { stabilize_counter++; if (stabilize_counter < 0 || stabilize_counter == 4) { // reset counter stabilize_counter = 0; // clear pending connections pending.clear(); // get number of real neighbors size_t numNeighbors = 0; for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++; } logging_info("Running stabilization: #links=" << table->size() << " #neighbors=" << numNeighbors ); // sending discovery logging_debug("Sending discovery message to my neighbors and fingers"); stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() ); const NodeID disc1 = nodeid; const NodeID disc2 = table->get_finger_table(stabilize_finger).get_compare().get_center(); send_discovery_to(disc1); if (disc1 != disc2) send_discovery_to(disc2); for (int i=0; isize(); i++) { LinkID id = (*table)[i]->info; if (!id.isUnspecified()) discover_neighbors(id); } // remove orphan links orphan_removal_counter++; <<<<<<< .working if (orphan_removal_counter <0 || orphan_removal_counter >= 4) { ======= if (orphan_removal_counter <0 || orphan_removal_counter >= 2) { >>>>>>> .merge-rechts.r5869 logging_info("Running orphan removal"); orphan_removal_counter = 0; for (size_t i = 0; i < table->size(); i++) { route_item* it = (*table)[i]; if (it->ref_count == 0 && !it->info.isUnspecified()) { logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString()); table->insert(it->id); if (it->ref_count==0) { LinkID id = it->info; it->info = LinkID::UNSPECIFIED; baseoverlay.dropLink(id); } } } } } } void Chord::showLinks() { logging_info("--- chord routing information ----------------------------------"); logging_info("predecessor: " << (table->get_predesessor()==NULL? "" : table->get_predesessor()->toString()) ); logging_info("node_id : " << nodeid.toString() ); logging_info("successor : " << (table->get_successor()==NULL? "" : table->get_successor()->toString())); logging_info("----------------------------------------------------------------"); } }} // namespace ariba, overlay