// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "BaseOverlay.h" #include #include #include #include #include "ariba/NodeListener.h" #include "ariba/CommunicationListener.h" #include "ariba/SideportListener.h" #include "ariba/overlay/LinkDescriptor.h" #include "ariba/overlay/messages/OverlayMsg.h" #include "ariba/overlay/messages/JoinRequest.h" #include "ariba/overlay/messages/JoinReply.h" #include "ariba/utility/visual/OvlVis.h" #include "ariba/utility/visual/DddVis.h" #include "ariba/utility/visual/ServerVis.h" #include namespace ariba { namespace overlay { using namespace std; using ariba::transport::system_priority; #define visualInstance ariba::utility::DddVis::instance() #define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY #define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION // time constants (in seconds) #define KEEP_ALIVE_TIME 60 // send keep-alive message after link is not used for #s #define LINK_ESTABLISH_TIME_OUT 10 // timeout: link requested but not up #define KEEP_ALIVE_TIME_OUT KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT // timeout: no data received on this link (incl. keep-alive messages) #define AUTO_LINK_TIME_OUT KEEP_ALIVE_TIME_OUT // timeout: auto link not used for #s // ---------------------------------------------------------------------------- /* ***************************************************************************** * PREREQUESITES * ****************************************************************************/ CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) { if( !communicationListeners.contains( service ) ) { logging_info( "No listener found for service " << service.toString() ); return NULL; } CommunicationListener* listener = communicationListeners.get( service ); assert( listener != NULL ); return listener; } // link descriptor handling ---------------------------------------------------- LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) { foreach( LinkDescriptor* lp, links ) if ((communication ? lp->communicationId : lp->overlayId) == link) return lp; return NULL; } const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const { foreach( const LinkDescriptor* lp, links ) if ((communication ? lp->communicationId : lp->overlayId) == link) return lp; return NULL; } /// erases a link descriptor void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) { for ( vector::iterator i = links.begin(); i!= links.end(); i++) { LinkDescriptor* ld = *i; if ((communication ? ld->communicationId : ld->overlayId) == link) { delete ld; links.erase(i); break; } } } /// adds a link descriptor LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) { LinkDescriptor* desc = getDescriptor( link ); if ( desc == NULL ) { desc = new LinkDescriptor(); if (!link.isUnspecified()) desc->overlayId = link; links.push_back(desc); } return desc; } /// returns a auto-link descriptor LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) { // search for a descriptor that is already up foreach( LinkDescriptor* lp, links ) { if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) ) return lp; } // if not found, search for one that is about to come up... foreach( LinkDescriptor* lp, links ) { time_t now = time(NULL); if (lp->autolink && lp->remoteNode == node && lp->service == service && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT ) return lp; } return NULL; } /// stabilizes link information void BaseOverlay::stabilizeLinks() { time_t now = time(NULL); // send keep-alive messages over established links foreach( LinkDescriptor* ld, links ) { if (!ld->up) continue; if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME ) { logging_debug("[BaseOverlay] Sending KeepAlive over " << ld->to_string() << " after " << difftime( now, ld->keepAliveSent ) << "s"); OverlayMsg msg( OverlayMsg::typeKeepAlive, OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); msg.setRouteRecord(true); ld->keepAliveSent = now; send_link( &msg, ld->overlayId, system_priority::OVERLAY ); } } // iterate over all links and check for time boundaries vector oldlinks; foreach( LinkDescriptor* ld, links ) { // link connection request stale? if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT ) // NOTE: keepAliveReceived == now, on connection request { logging_info( "Link connection request is stale, closing: " << ld ); ld->failed = true; oldlinks.push_back( ld ); continue; } if (!ld->up) continue; // check if link is relayed and retry connecting directly // TODO Mario: What happens here? --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) { ld->retryCounter--; ld->communicationId = bc->establishLink( ld->endpoint ); } // remote used as relay flag if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT) // TODO is this a reasonable timeout ?? ld->relaying = false; // drop links that are dropped and not used as relay if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) { oldlinks.push_back( ld ); continue; } // auto-link time exceeded? if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) { oldlinks.push_back( ld ); continue; } // keep alives missed? yes-> if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT ) { logging_info( "Link is stale, closing: " << ld ); ld->failed = true; oldlinks.push_back( ld ); continue; } } // drop links foreach( LinkDescriptor* ld, oldlinks ) { logging_info( "Link timed out. Dropping " << ld ); ld->relaying = false; dropLink( ld->overlayId ); } // show link state (debug output) if (counter>=10 || counter<0) { showLinks(); counter = 0; } else { counter++; } } std::string BaseOverlay::getLinkHTMLInfo() { std::ostringstream s; vector nodes; if (links.size()==0) { s << "

No links established!

"; } else { s << "

Links

"; s << ""; s << ""; s << ""; s << ""; int i=0; foreach( LinkDescriptor* ld, links ) { if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue; bool found = false; foreach(NodeID& id, nodes) if (id == ld->remoteNode) found = true; if (found) continue; i++; nodes.push_back(ld->remoteNode); if ((i%1) == 1) s << ""; else s << ""; s << ""; s << ""; s << ""; s << ""; } s << "
Link IDRemote IDRelay path
" << ld->overlayId.toString().substr(0,4) << ".." << ld->remoteNode.toString().substr(0,4) << ".."; if (ld->routeRecord.size()>1 && ld->relayed) { for (size_t i=1; irouteRecord.size(); i++) s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. "; } else { s << "Direct"; } s << "
"; } return s.str(); } /// shows the current link state void BaseOverlay::showLinks() { int i=0; logging_info("--- link state -------------------------------"); foreach( LinkDescriptor* ld, links ) { string epd = ""; if (isLinkDirectVital(ld)) { // epd = getEndpointDescriptor(ld->remoteNode).toString(); epd = "Connection: "; epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string(); epd += " <---> "; epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string(); } logging_info("LINK_STATE: " << i << ": " << ld << " " << epd); i++; } logging_info("----------------------------------------------"); } /// compares two arbitrary links to the same node int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) { LinkDescriptor* lhsld = getDescriptor(lhs); LinkDescriptor* rhsld = getDescriptor(rhs); if (lhsld==NULL || rhsld==NULL || !lhsld->up || !rhsld->up || lhsld->remoteNode != rhsld->remoteNode) return -1; if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) ) return -1; return 1; } // internal message delivery --------------------------------------------------- seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority ) { // set priority message->setPriority(priority); // wrap old OverlayMsg into reboost message reboost::message_t wrapped_message = message->wrap_up_for_sending(); // send down to BaseCommunication try { // * send * return bc->sendMessage(bc_link, wrapped_message, priority, false); } catch ( communication::communication_message_not_sent& e ) { ostringstream out; out << "Communication message not sent: " << e.what(); throw message_not_sent(out.str()); } throw logic_error("This should never happen!"); } /// routes a message to its destination node void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop ) { // exceeded time-to-live? yes-> drop message if (message->getNumHops() > message->getTimeToLive()) { logging_warn("Message exceeded TTL. Dropping message and relay routes " << "for recovery. Hop count: " << (int) message->getNumHops()); removeRelayNode(message->getDestinationNode()); return; } // no-> forward message else { // destinastion myself? yes-> handle message if (message->getDestinationNode() == nodeId) { logging_warn("Usually I should not route messages to myself. And I won't!"); } // no->send message to next hop else { try { /* (deep) packet inspection to determine priority */ // BRANCH: typeData --> send with low priority if ( message->getType() == OverlayMsg::typeData ) { // TODO think about implementing explicit routing queue (with active queue management??) send( message, message->getDestinationNode(), message->getPriority(), last_hop ); } // BRANCH: internal message --> send with higher priority else { send( message, message->getDestinationNode(), system_priority::HIGH, last_hop ); } } catch ( message_not_sent& e ) { logging_warn("Unable to route message of type " << message->getType() << " to " << message->getDestinationNode() << ". Reason: " << e.what()); // inform sender if ( message->getType() != OverlayMsg::typeMessageLost ) { report_lost_message(message); } } } } } void BaseOverlay::report_lost_message( const OverlayMsg* message ) { OverlayMsg reply(OverlayMsg::typeMessageLost); reply.setSeqNum(message->getSeqNum()); /** * MessageLost-Message * * - Type of lost message * - Hop count of lost message * - Source-LinkID of lost message */ reboost::shared_buffer_t b(sizeof(uint8_t)*2); b.mutable_data()[0] = message->getType(); b.mutable_data()[1] = message->getNumHops(); reply.append_buffer(b); reply.append_buffer(message->getSourceLink().serialize()); try { send_node(&reply, message->getSourceNode(), system_priority::OVERLAY, OverlayInterface::OVERLAY_SERVICE_ID); } catch ( message_not_sent& e ) { logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too."); } } /// sends a message to another node, delivers it to the base overlay class seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination, uint8_t priority, const NodeID& last_hop ) throw(message_not_sent) { LinkDescriptor* next_link = NULL; // drop messages to unspecified destinations if (destination.isUnspecified()) throw message_not_sent("No destination specified. Drop!"); // send messages to myself -> drop! // TODO maybe this is not what we want. why not just deliver this message? // There is a similar check in the route function, there it should be okay. if (destination == nodeId) { logging_warn("Sent message to myself. Drop!"); throw message_not_sent("Sent message to myself. Drop!"); } // use relay path? if (message->isRelayed()) { next_link = getRelayLinkTo( destination ); if (next_link != NULL) { next_link->setRelaying(); // * send message over relayed link * return send_overlaymessage_down(message, next_link->communicationId, priority); } else { logging_warn("No relay hop found to " << destination << " -- trying to route over overlay paths ...") } } // last resort -> route over overlay path LinkID next_id = overlayInterface->getNextLinkId( destination ); if ( next_id.isUnspecified() ) { // apperently we're the closest node --> try second best node // NOTE: This is helpful if our routing table is not up-to-date, but // may lead to circles. So we have to be careful. std::vector next_ids = overlayInterface->getSortedLinkIdsTowardsNode( destination ); for ( int i = 0; i < next_ids.size(); i++ ) { const LinkID& link = *next_ids[i]; if ( ! link.isUnspecified() ) { next_id = link; break; } } // still no next hop found. drop. if ( next_id.isUnspecified() ) { logging_warn("Could not send message. No next hop found to " << destination ); logging_error("ERROR: " << debugInformation() ); throw message_not_sent("No next hop found."); } } /* get link descriptor, do some checks and send message */ next_link = getDescriptor(next_id); // check pointer if ( next_link == NULL ) { // NOTE: this shuldn't happen throw message_not_sent("Could not send message. Link not known."); } // avoid circles if ( next_link->remoteNode == last_hop ) { // XXX logging_debug logging_info("Possible next hop would create a circle: " << next_link->remoteNode); throw message_not_sent("Could not send message. Possible next hop would create a circle."); } // check if link is up if ( ! next_link->up) { logging_warn("Could not send message. Link not up"); logging_error("ERROR: " << debugInformation() ); throw message_not_sent("Could not send message. Link not up"); } // * send message over overlay link * return send(message, next_link, priority); } /// send a message using a link descriptor, delivers it to the base overlay class seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, uint8_t priority ) throw(message_not_sent) { // check if null if (ldr == NULL) { ostringstream out; out << "Can not send message to " << message->getDestinationAddress(); throw message_not_sent(out.str()); } // check if up if ( !ldr->up ) { logging_error("DEBUG_INFO: " << debugInformation() ); ostringstream out; out << "Can not send message. Link not up:" << ldr->to_string(); throw message_not_sent(out.str()); } LinkDescriptor* next_hop_ld = NULL; // BRANCH: relayed link if (ldr->relayed) { logging_debug("Resolving direct link for relayed link to " << ldr->remoteNode); next_hop_ld = getRelayLinkTo( ldr->remoteNode ); if (next_hop_ld==NULL) { logging_error("DEBUG_INFO: " << debugInformation() ); ostringstream out; out << "No relay path found to link: " << ldr; throw message_not_sent(out.str()); } next_hop_ld->setRelaying(); message->setRelayed(true); } // BRANCH: direct link else { next_hop_ld = ldr; } // check next hop-link if ( ! next_hop_ld->communicationUp) { throw message_not_sent( "send(): Could not send message." " Not a relayed link and direct link is not up." ); } // send over next link logging_debug("send(): Sending message over direct link."); return send_overlaymessage_down(message, next_hop_ld->communicationId, priority); } seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote, uint8_t priority, const ServiceID& service) throw(message_not_sent) { message->setSourceNode(nodeId); message->setDestinationNode(remote); message->setService(service); return send( message, remote, priority ); } void BaseOverlay::send_link( OverlayMsg* message, const LinkID& link, uint8_t priority ) throw(message_not_sent) { LinkDescriptor* ld = getDescriptor(link); if (ld==NULL) { throw message_not_sent("Cannot find descriptor to link id=" + link.toString()); } message->setSourceNode(nodeId); message->setDestinationNode(ld->remoteNode); message->setSourceLink(ld->overlayId); message->setDestinationLink(ld->remoteLink); message->setService(ld->service); message->setRelayed(ld->relayed); try { // * send message * send( message, ld, priority ); } catch ( message_not_sent& e ) { // drop failed link ld->failed = true; dropLink(ld->overlayId); } } // relay route management ------------------------------------------------------ /// stabilize relay information void BaseOverlay::stabilizeRelays() { vector::iterator i = relay_routes.begin(); while (i!=relay_routes.end() ) { relay_route& route = *i; LinkDescriptor* ld = getDescriptor(route.link); // relay link still used and alive? if (ld==NULL || !isLinkDirectVital(ld) || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT) // TODO this was set to 8 before.. Is the new timeout better? { logging_info("Forgetting relay information to node " << route.node.toString() ); i = relay_routes.erase(i); } else i++; } } void BaseOverlay::removeRelayLink( const LinkID& link ) { vector::iterator i = relay_routes.begin(); while (i!=relay_routes.end() ) { relay_route& route = *i; if (route.link == link ) i = relay_routes.erase(i); else i++; } } void BaseOverlay::removeRelayNode( const NodeID& remote ) { vector::iterator i = relay_routes.begin(); while (i!=relay_routes.end() ) { relay_route& route = *i; if (route.node == remote ) i = relay_routes.erase(i); else i++; } } /// refreshes relay information void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) { // handle relayed messages from real links only if (ld == NULL || ld->relayed || message->getSourceNode()==nodeId ) return; // update usage information if (message->isRelayed()) { // try to find source node foreach( relay_route& route, relay_routes ) { // relay route found? yes-> if ( route.node == message->getDestinationNode() ) { ld->setRelaying(); route.used = time(NULL); } } } // register relay path if (message->isRegisterRelay()) { // set relaying ld->setRelaying(); // try to find source node foreach( relay_route& route, relay_routes ) { // relay route found? yes-> if ( route.node == message->getSourceNode() ) { // refresh timer route.used = time(NULL); LinkDescriptor* rld = getDescriptor(route.link); // route has a shorter hop count or old link is dead? yes-> replace if (route.hops > message->getNumHops() || rld == NULL || !isLinkDirectVital(ld)) { logging_info("Updating relay information to node " << route.node.toString() << " reducing to " << (int) message->getNumHops() << " hops."); route.hops = message->getNumHops(); route.link = ld->overlayId; } return; } } // not found-> add new entry relay_route route; route.hops = message->getNumHops(); route.link = ld->overlayId; route.node = message->getSourceNode(); route.used = time(NULL); logging_info("Remembering relay information to node " << route.node.toString()); relay_routes.push_back(route); } } /// returns a known "vital" relay link which is up and running LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) { // try to find source node foreach( relay_route& route, relay_routes ) { if (route.node == remote ) { LinkDescriptor* ld = getDescriptor( route.link ); if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else { route.used = time(NULL); return ld; } } } return NULL; } /* ***************************************************************************** * PUBLIC MEMBERS * ****************************************************************************/ use_logging_cpp(BaseOverlay); // ---------------------------------------------------------------------------- BaseOverlay::BaseOverlay() : started(false),state(BaseOverlayStateInvalid), bc(NULL), nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED), sideport(&SideportListener::DEFAULT), overlayInterface(NULL), counter(0) { } BaseOverlay::~BaseOverlay() { } // ---------------------------------------------------------------------------- void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) { logging_info("Starting..."); // set parameters bc = _basecomm; nodeId = _nodeid; // register at base communication bc->registerMessageReceiver( this ); bc->registerEventListener( this ); // timer for auto link management Timer::setInterval( 1000 ); // XXX // Timer::setInterval( 10000 ); Timer::start(); started = true; state = BaseOverlayStateInvalid; } void BaseOverlay::stop() { logging_info("Stopping..."); // stop timer Timer::stop(); // delete oberlay interface if(overlayInterface != NULL) { delete overlayInterface; overlayInterface = NULL; } // unregister at base communication bc->unregisterMessageReceiver( this ); bc->unregisterEventListener( this ); started = false; state = BaseOverlayStateInvalid; } bool BaseOverlay::isStarted(){ return started; } // ---------------------------------------------------------------------------- void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp) { if(id != spovnetId){ logging_error("attempt to join against invalid spovnet, call initiate first"); return; } //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." ); logging_info( "Starting to join spovnet " << id.toString() << " with nodeid " << nodeId.toString()); if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){ //** FIRST STEP - MANDATORY */ // bootstrap against ourselfs logging_info("joining spovnet locally"); overlayInterface->joinOverlay(); state = BaseOverlayStateCompleted; foreach( NodeListener* i, nodeListeners ) i->onJoinCompleted( spovnetId ); //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA ); //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN ); } else { //** SECOND STEP - OPTIONAL */ // bootstrap against another node logging_info("joining spovnet remotely against " << bootstrapEp.toString()); const LinkID& lnk = bc->establishLink( bootstrapEp ); bootstrapLinks.push_back(lnk); logging_info("join process initiated for " << id.toString() << "..."); } } void BaseOverlay::startBootstrapModules(vector > modules){ logging_debug("starting overlay bootstrap module"); overlayBootstrap.start(this, spovnetId, nodeId, modules); overlayBootstrap.publish(bc->getEndpointDescriptor()); } void BaseOverlay::stopBootstrapModules(){ logging_debug("stopping overlay bootstrap module"); overlayBootstrap.stop(); overlayBootstrap.revoke(); } void BaseOverlay::leaveSpoVNet() { logging_info( "Leaving spovnet " << spovnetId ); bool ret = ( state != this->BaseOverlayStateInvalid ); logging_debug( "Dropping all auto-links" ); // gather all service links vector servicelinks; foreach( LinkDescriptor* ld, links ) { if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID ) servicelinks.push_back( ld->overlayId ); } // drop all service links foreach( LinkID lnk, servicelinks ) { logging_debug("Dropping service link " << lnk.toString()); dropLink( lnk ); } // let the node leave the spovnet overlay interface logging_debug( "Leaving overlay" ); if( overlayInterface != NULL ) { overlayInterface->leaveOverlay(); } // drop still open bootstrap links foreach( LinkID lnk, bootstrapLinks ) { logging_debug("Dropping bootstrap link " << lnk.toString()); bc->dropLink( lnk ); } // change to inalid state state = BaseOverlayStateInvalid; //ovl.visShutdown( ovlId, nodeId, string("") ); visualInstance.visShutdown(visualIdOverlay, nodeId, ""); visualInstance.visShutdown(visualIdBase, nodeId, ""); // inform all registered services of the event foreach( NodeListener* i, nodeListeners ) { if( ret ) i->onLeaveCompleted( spovnetId ); else i->onLeaveFailed( spovnetId ); } } void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos) { // set the state that we are an initiator, this way incoming messages are // handled correctly logging_info( "creating spovnet " + id.toString() << " with nodeid " << nodeId.toString() ); spovnetId = id; overlayInterface = OverlayFactory::create( *this, param, nodeId, this ); if( overlayInterface == NULL ) { logging_fatal( "overlay structure not supported" ); state = BaseOverlayStateInvalid; foreach( NodeListener* i, nodeListeners ) i->onJoinFailed( spovnetId ); return; } visualInstance.visCreate(visualIdBase, nodeId, "", ""); visualInstance.visCreate(visualIdOverlay, nodeId, "", ""); } // ---------------------------------------------------------------------------- const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp, const NodeID& remoteId, const ServiceID& service ) { // establish link via overlay if (!remoteId.isUnspecified()) return establishLink( remoteId, service ); else return establishDirectLink(remoteEp, service ); } /// call base communication's establish link and add link mapping const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep, const ServiceID& service ) { /// find a service listener if( !communicationListeners.contains( service ) ) { logging_error( "No listener registered for service id=" << service.toString() ); return LinkID::UNSPECIFIED; } CommunicationListener* listener = communicationListeners.get( service ); assert( listener != NULL ); // create descriptor LinkDescriptor* ld = addDescriptor(); ld->relayed = false; ld->listener = listener; ld->service = service; ld->communicationId = bc->establishLink( ep ); /// establish link and add mapping logging_info("Establishing direct link " << ld->communicationId.toString() << " using " << ep.toString()); return ld->communicationId; } /// establishes a link between two arbitrary nodes const LinkID BaseOverlay::establishLink( const NodeID& remote, const ServiceID& service ) { // TODO What if we already have a Link to this node and this service id? // do not establish a link to myself! if (remote == nodeId) return LinkID::UNSPECIFIED; // create a link descriptor LinkDescriptor* ld = addDescriptor(); ld->relayed = true; ld->remoteNode = remote; ld->service = service; ld->listener = getListener(ld->service); // initialize sequence numbers ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short(); logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum); // create link request message OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote ); msg.setSourceLink(ld->overlayId); // send over relayed link msg.setRelayed(true); msg.setRegisterRelay(true); // msg.setRouteRecord(true); msg.setSeqNum(ld->last_sent_seqnum); // debug message logging_info( "Sending link request with" << " link=" << ld->overlayId.toString() << " node=" << ld->remoteNode.toString() << " serv=" << ld->service.toString() ); // sending message to node try { // * send * seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service ); } catch ( message_not_sent& e ) { logging_warn("Link request not sent: " << e.what()); // Message not sent. Cancel link request. SystemQueue::instance().scheduleCall( boost::bind( &BaseOverlay::__onLinkEstablishmentFailed, this, ld->overlayId) ); } return ld->overlayId; } /// NOTE: "id" is an Overlay-LinkID void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id) { // TODO This code redundant. But also it's not easy to aggregate in one function. // get descriptor for link LinkDescriptor* ld = getDescriptor(id, false); if ( ld == NULL ) return; // not found? ->ignore! logging_debug( "__onLinkEstablishmentFaild: " << ld ); // removing relay link information removeRelayLink(ld->overlayId); // inform listeners about link down ld->communicationUp = false; if (!ld->service.isUnspecified()) { CommunicationListener* lst = getListener(ld->service); if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode ); sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); } // delete all queued messages (auto links) if( ld->messageQueue.size() > 0 ) { logging_warn( "Dropping link " << id.toString() << " that has " << ld->messageQueue.size() << " waiting messages" ); ld->flushQueue(); } // erase mapping eraseDescriptor(ld->overlayId); } /// drops an established link void BaseOverlay::dropLink(const LinkID& link) { logging_info( "Dropping link: " << link.toString() ); // find the link item to drop LinkDescriptor* ld = getDescriptor(link); if( ld == NULL ) { logging_warn( "Can't drop link, link is unknown!"); return; } // delete all queued messages if( ld->messageQueue.size() > 0 ) { logging_warn( "Dropping link " << ld->overlayId.toString() << " that has " << ld->messageQueue.size() << " waiting messages" ); ld->flushQueue(); } // inform application and remote note (but only once) // NOTE: If we initiated the drop, this function is called twice, but on // the second call, there is noting to do. if ( ld->up && ! ld->failed ) { // inform sideport and listener if(ld->listener != NULL) { ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); } sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId ); // send link-close to remote node logging_info("Sending LinkClose message to remote node."); OverlayMsg close_msg(OverlayMsg::typeLinkClose); send_link(&close_msg, link, system_priority::OVERLAY); // deactivate link ld->up = false; // ld->closing = true; } else if ( ld->failed ) { // inform listener if( ld->listener != NULL ) { ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); } ld->up = false; __removeDroppedLink(ld->overlayId); } } /// called from typeLinkClose-handler void BaseOverlay::__removeDroppedLink(const LinkID& link) { // find the link item to drop LinkDescriptor* ld = getDescriptor(link); if( ld == NULL ) { return; } // do not drop relay links if (!ld->relaying) { // drop the link in base communication if (ld->communicationUp) { bc->dropLink( ld->communicationId ); } // erase descriptor eraseDescriptor( ld->overlayId ); } else { ld->dropAfterRelaying = true; } } // ---------------------------------------------------------------------------- /// internal send message, always use this functions to send messages over links const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message, const LinkID& link, uint8_t priority ) throw(message_not_sent) { logging_debug( "Sending data message on link " << link.toString() ); // get the mapping for this link LinkDescriptor* ld = getDescriptor(link); if( ld == NULL ) { throw message_not_sent("Could not send message. Link not found id=" + link.toString()); } // check if the link is up yet, if its an auto link queue message if( !ld->up ) { ld->setAutoUsed(); if( ld->autolink ) { logging_info("Auto-link " << link.toString() << " not up, queue message"); // queue message LinkDescriptor::message_queue_entry msg; msg.message = message; msg.priority = priority; ld->messageQueue.push_back( msg ); return SequenceNumber::DISABLED; // TODO what to return if message is queued? } else { throw message_not_sent("Link " + link.toString() + " not up, drop message"); } } // TODO AKTUELL: sequence numbers // TODO seqnum on fast path ? ld->last_sent_seqnum.increment(); /* choose fast-path for direct links; normal overlay-path otherwise */ // BRANCH: direct link if ( ld->communicationUp && !ld->relayed ) { // * send down to BaseCommunication * try { bc->sendMessage(ld->communicationId, message, priority, true); } catch ( communication::communication_message_not_sent& e ) { ostringstream out; out << "Communication message on fast-path not sent: " << e.what(); throw message_not_sent(out.str()); } } // BRANCH: use (slow) overlay-path else { // compile overlay message (has service and node id) OverlayMsg overmsg( OverlayMsg::typeData ); overmsg.set_payload_message(message); // set SeqNum if ( ld->transmit_seqnums ) { overmsg.setSeqNum(ld->last_sent_seqnum); } logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum()); // send message over relay/direct/overlay send_link( &overmsg, ld->overlayId, priority ); } // return seqnum return ld->last_sent_seqnum; } const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message, const NodeID& node, uint8_t priority, const ServiceID& service) { // find link for node and service LinkDescriptor* ld = getAutoDescriptor( node, service ); // if we found no link, create an auto link if( ld == NULL ) { // debug output logging_info( "No link to send message to node " << node.toString() << " found for service " << service.toString() << ". Creating auto link ..." ); // call base overlay to create a link LinkID link = establishLink( node, service ); ld = getDescriptor( link ); if( ld == NULL ) { logging_error( "Failed to establish auto-link."); throw message_not_sent("Failed to establish auto-link."); } ld->autolink = true; logging_debug( "Auto-link establishment in progress to node " << node.toString() << " with link id=" << link.toString() ); } assert(ld != NULL); // mark the link as used, as we now send a message through it ld->setAutoUsed(); // send / queue message return sendMessage( message, ld->overlayId, priority ); } NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message, const NodeID& address, uint8_t priority, const ServiceID& service) { if ( overlayInterface->isClosestNodeTo(address) ) { return NodeID::UNSPECIFIED; } const NodeID& closest_node = overlayInterface->getNextNodeId(address); if ( closest_node != NodeID::UNSPECIFIED ) { sendMessage(message, closest_node, priority, service); } return closest_node; // return seqnum ?? tuple? closest_node via (non const) reference? } // ---------------------------------------------------------------------------- const EndpointDescriptor& BaseOverlay::getEndpointDescriptor( const LinkID& link) const { // return own end-point descriptor if( link.isUnspecified() ) return bc->getEndpointDescriptor(); // find link descriptor. not found -> return unspecified const LinkDescriptor* ld = getDescriptor(link); if (ld==NULL) return EndpointDescriptor::UNSPECIFIED(); // return endpoint-descriptor from base communication return bc->getEndpointDescriptor( ld->communicationId ); } const EndpointDescriptor& BaseOverlay::getEndpointDescriptor( const NodeID& node) const { // return own end-point descriptor if( node == nodeId || node.isUnspecified() ) { //logging_info("getEndpointDescriptor: returning self."); return bc->getEndpointDescriptor(); } // no joined and request remote descriptor? -> fail! if( overlayInterface == NULL ) { logging_error( "Overlay interface not set, cannot resolve end-point." ); return EndpointDescriptor::UNSPECIFIED(); } // // resolve end-point descriptor from the base-overlay routing table // const EndpointDescriptor& ep = overlayInterface->resolveNode( node ); // if(ep.toString() != "") return ep; // see if we can find the node in our own table foreach(const LinkDescriptor* ld, links){ if(ld->remoteNode != node) continue; if(!ld->communicationUp) continue; const EndpointDescriptor& ep = bc->getEndpointDescriptor(ld->communicationId); if(ep != EndpointDescriptor::UNSPECIFIED()) { //logging_info("getEndpointDescriptor: using " << ld->to_string()); return ep; } } logging_warn( "No EndpointDescriptor found for node " << node ); logging_warn( const_cast(this)->debugInformation() ); return EndpointDescriptor::UNSPECIFIED(); } // ---------------------------------------------------------------------------- bool BaseOverlay::registerSidePort(SideportListener* _sideport) { sideport = _sideport; _sideport->configure( this ); return true; } bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) { sideport = &SideportListener::DEFAULT; return true; } // ---------------------------------------------------------------------------- bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) { logging_debug( "binding communication listener " << listener << " on serviceid " << sid.toString() ); if( communicationListeners.contains( sid ) ) { logging_error( "some listener already registered for service id " << sid.toString() ); return false; } communicationListeners.registerItem( listener, sid ); return true; } bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) { logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() ); if( !communicationListeners.contains( sid ) ) { logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() ); return false; } if( communicationListeners.get(sid) != listener ) { logging_warn( "listener bound to service id " << sid.toString() << " is different than listener trying to unbind" ); return false; } communicationListeners.unregisterItem( sid ); return true; } // ---------------------------------------------------------------------------- bool BaseOverlay::bind(NodeListener* listener) { logging_debug( "Binding node listener " << listener ); // already bound? yes-> warning NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener ); if( i != nodeListeners.end() ) { logging_warn("Node listener " << listener << " is already bound!" ); return false; } // no-> add nodeListeners.push_back( listener ); return true; } bool BaseOverlay::unbind(NodeListener* listener) { logging_debug( "Unbinding node listener " << listener ); // already unbound? yes-> warning NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener ); if( i == nodeListeners.end() ) { logging_warn( "Node listener " << listener << " is not bound!" ); return false; } // no-> remove nodeListeners.erase( i ); return true; } // ---------------------------------------------------------------------------- void BaseOverlay::onLinkUp(const LinkID& id, const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) { logging_debug( "Link up with base communication link id=" << id ); // get descriptor for link LinkDescriptor* ld = getDescriptor(id, true); // BRANCH: handle bootstrap link we initiated if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){ logging_info( "Join has been initiated by me and the link is now up. " << "LinkID: " << id.toString() << "Sending out join request for SpoVNet " << spovnetId.toString() ); // send join request message OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); JoinRequest joinRequest( spovnetId, nodeId ); overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer()); send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY); return; } // BRANCH: link establishment from remote, add one! if (ld == NULL) { ld = addDescriptor( id ); logging_info( "onLinkUp (remote request) descriptor: " << ld ); // update descriptor ld->fromRemote = true; ld->communicationId = id; ld->communicationUp = true; ld->setAutoUsed(); ld->setAlive(); // in this case, do not inform listener, since service it unknown // -> wait for update message! } // BRANCH: We requested this link in the first place else { logging_info( "onLinkUp descriptor (initiated locally):" << ld ); // update descriptor ld->setAutoUsed(); ld->setAlive(); ld->communicationUp = true; ld->fromRemote = false; // BRANCH: this was a relayed link before --> convert to direct link // TODO do we really have to send a message here? if (ld->relayed) { ld->up = true; ld->relayed = false; logging_info( "Converting to direct link: " << ld ); // send message OverlayMsg overMsg( OverlayMsg::typeLinkDirect ); overMsg.setSourceLink( ld->overlayId ); overMsg.setDestinationLink( ld->remoteLink ); send_link( &overMsg, ld->overlayId, system_priority::OVERLAY ); // inform listener if( ld->listener != NULL) ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); } /* NOTE: Chord is opening direct-links in it's setup routine which are * neither set to "relayed" nor to "up". To activate these links a * typeLinkUpdate must be sent. * * This branch is would also be taken when we had a working link before * (ld->up == true). I'm not sure if this case does actually happen * and whether it's tested. */ else { // note: necessary to validate the link on the remote side! logging_info( "Sending out update" << " for service " << ld->service.toString() << " with local node id " << nodeId.toString() << " on link " << ld->overlayId.toString() ); // compile and send update message OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate ); overlayMsg.setAutoLink( ld->autolink ); overlayMsg.setSourceNode(nodeId); overlayMsg.setDestinationNode(ld->remoteNode); overlayMsg.setSourceLink(ld->overlayId); overlayMsg.setDestinationLink(ld->remoteLink); overlayMsg.setService(ld->service); overlayMsg.setRelayed(false); // TODO ld->communicationId = id ?? send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY); } } } void BaseOverlay::onLinkDown(const LinkID& id, const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) { // erase bootstrap links vector::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); // get descriptor for link LinkDescriptor* ld = getDescriptor(id, true); if ( ld == NULL ) return; // not found? ->ignore! logging_info( "onLinkDown descriptor: " << ld ); // removing relay link information removeRelayLink(ld->overlayId); // inform listeners about link down ld->communicationUp = false; if (!ld->service.isUnspecified()) { CommunicationListener* lst = getListener(ld->service); if(lst != NULL) lst->onLinkDown( ld->overlayId, ld->remoteNode ); sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId ); } // delete all queued messages (auto links) if( ld->messageQueue.size() > 0 ) { logging_warn( "Dropping link " << id.toString() << " that has " << ld->messageQueue.size() << " waiting messages" ); ld->flushQueue(); } // erase mapping eraseDescriptor(ld->overlayId); } void BaseOverlay::onLinkFail(const LinkID& id, const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) { logging_debug( "Link fail with base communication link id=" << id ); // // erase bootstrap links // vector::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); // if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); // // // get descriptor for link // LinkDescriptor* ld = getDescriptor(id, true); // if ( ld == NULL ) return; // not found? ->ignore! // logging_debug( "Link failed id=" << ld->overlayId.toString() ); // // // inform listeners // ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); // sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); logging_debug( " ... calling onLinkDown ..." ); onLinkDown(id, local, remote); } void BaseOverlay::onLinkChanged(const LinkID& id, const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote) { // get descriptor for link LinkDescriptor* ld = getDescriptor(id, true); if ( ld == NULL ) return; // not found? ->ignore! logging_debug( "onLinkChanged descriptor: " << ld ); // inform listeners ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId ); // autolinks: refresh timestamp ld->setAutoUsed(); } //void BaseOverlay::onLinkQoSChanged(const LinkID& id, // const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, // const QoSParameterSet& qos) //{ // logging_debug( "Link quality changed with base communication link id=" << id ); // // // get descriptor for link // LinkDescriptor* ld = getDescriptor(id, true); // if ( ld == NULL ) return; // not found? ->ignore! // logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); //} bool BaseOverlay::onLinkRequest(const LinkID& id, const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) { logging_debug("Accepting link request from " << remote->to_string() ); // TODO ask application..? return true; } /// handles a message from base communication bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message, const LinkID& link, const NodeID&, bool bypass_overlay ) { // get descriptor for link LinkDescriptor* ld = getDescriptor( link, true ); /* choose fastpath for direct links; normal overlay-path otherwise */ if ( bypass_overlay && ld ) { // message received --> link is alive ld->keepAliveReceived = time(NULL); // hop count on this link ld->hops = 0; // hand over to CommunicationListener (aka Application) CommunicationListener* lst = getListener(ld->service); if ( lst != NULL ) { lst->onMessage( message, ld->remoteNode, ld->overlayId, SequenceNumber::DISABLED, NULL ); return true; } return false; } else { return handleMessage( message, ld, link ); } } // ---------------------------------------------------------------------------- /// Handle spovnet instance join requests bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink ) { // decapsulate message JoinRequest joinReq; joinReq.deserialize_from_shared_buffer(message); logging_info( "Received join request for spovnet " << joinReq.getSpoVNetID().toString() ); // check spovnet id if( joinReq.getSpoVNetID() != spovnetId ) { logging_error( "Received join request for spovnet we don't handle " << joinReq.getSpoVNetID().toString() ); return false; } // TODO: here you can implement mechanisms to deny joining of a node bool allow = true; logging_info( "Sending join reply for spovnet " << spovnetId.toString() << " to node " << source.toString() << ". Result: " << (allow ? "allowed" : "denied") ); joiningNodes.push_back( source ); // return overlay parameters assert( overlayInterface != NULL ); logging_debug( "Using bootstrap end-point " << getEndpointDescriptor().toString() ) OverlayParameterSet parameters = overlayInterface->getParameters(); // create JoinReplay Message OverlayMsg retmsg( OverlayMsg::typeJoinReply, OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); JoinReply replyMsg( spovnetId, parameters, allow ); retmsg.append_buffer(replyMsg.serialize_into_shared_buffer()); // XXX This is unlovely clash between the old message system and the new one, // but a.t.m. we can't migrate everything to the new system at once.. // ---> Consider the EndpointDescriptor as part of the JoinReply.. retmsg.append_buffer(getEndpointDescriptor().serialize()); // * send * send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY); return true; } /// Handle replies to spovnet instance join requests bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink ) { // decapsulate message logging_debug("received join reply message"); JoinReply replyMsg; EndpointDescriptor endpoints; reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message); buff = endpoints.deserialize(buff); // correct spovnet? if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail logging_error( "Received SpoVNet join reply for " << replyMsg.getSpoVNetID().toString() << " != " << spovnetId.toString() ); return false; } // access granted? no -> fail if( !replyMsg.getJoinAllowed() ) { logging_error( "Our join request has been denied" ); // drop initiator link if( !bcLink.isUnspecified() ){ bc->dropLink( bcLink ); vector::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), bcLink); if( it != bootstrapLinks.end() ) bootstrapLinks.erase(it); } // inform all registered services of the event foreach( NodeListener* i, nodeListeners ) i->onJoinFailed( spovnetId ); return true; } // access has been granted -> continue! logging_info("Join request has been accepted for spovnet " << spovnetId.toString() ); logging_debug( "Using bootstrap end-point " << endpoints.toString() ); // create overlay structure from spovnet parameter set // if we have not boostrapped yet against some other node if( overlayInterface == NULL ){ logging_debug("first-time bootstrapping"); overlayInterface = OverlayFactory::create( *this, replyMsg.getParam(), nodeId, this ); // overlay structure supported? no-> fail! if( overlayInterface == NULL ) { logging_error( "overlay structure not supported" ); if( !bcLink.isUnspecified() ){ bc->dropLink( bcLink ); vector::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), bcLink); if( it != bootstrapLinks.end() ) bootstrapLinks.erase(it); } // inform all registered services of the event foreach( NodeListener* i, nodeListeners ) i->onJoinFailed( spovnetId ); return true; } // everything ok-> join the overlay! state = BaseOverlayStateCompleted; overlayInterface->createOverlay(); overlayInterface->joinOverlay( endpoints ); overlayBootstrap.recordJoin( endpoints ); // update ovlvis //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); // inform all registered services of the event foreach( NodeListener* i, nodeListeners ) i->onJoinCompleted( spovnetId ); } else { // this is not the first bootstrap, just join the additional node logging_debug("not first-time bootstrapping"); overlayInterface->joinOverlay( endpoints ); overlayBootstrap.recordJoin( endpoints ); } // if( overlayInterface == NULL ) return true; } bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld ) { // get service const ServiceID& service = ld->service; //overlayMsg->getService(); logging_debug( "Received data for service " << service.toString() << " on link " << overlayMsg->getDestinationLink().toString() ); // delegate data message CommunicationListener* lst = getListener(service); if(lst != NULL){ lst->onMessage( message, // overlayMsg->getSourceNode(), // overlayMsg->getDestinationLink(), ld->remoteNode, ld->overlayId, overlayMsg->getSeqNum(), overlayMsg ); } return true; } bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg ) { /** * Deserialize MessageLost-Message * * - Type of lost message * - Hop count of lost message * - Source-LinkID of lost message */ const uint8_t* buff = message(0, sizeof(uint8_t)*2).data(); uint8_t type = buff[0]; uint8_t hops = buff[1]; LinkID linkid; linkid.deserialize(message(sizeof(uint8_t)*2)); logging_warn("Node " << msg->getSourceNode() << " informed us, that our message of type " << (int) type << " is lost after traveling " << (int) hops << " hops." << " (LinkID: " << linkid.toString()); // TODO switch-case ? // BRANCH: LinkRequest --> link request failed if ( type == OverlayMsg::typeLinkRequest ) { __onLinkEstablishmentFailed(linkid); } // BRANCH: Data --> link disrupted. Drop link. // (We could use something more advanced here. e.g. At least send a // keep-alive message and wait for a keep-alive reply.) if ( type == OverlayMsg::typeData ) { LinkDescriptor* link_desc = getDescriptor(linkid); if ( link_desc ) { link_desc->failed = true; } dropLink(linkid); } // BRANCH: ping lost if ( type == OverlayMsg::typePing ) { CommunicationListener* lst = getListener(msg->getService()); if( lst != NULL ) { lst->onPingLost(msg->getSourceNode()); } } return true; } bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node) bool send_pong = false; // inform application and ask permission to send a pong message CommunicationListener* lst = getListener(overlayMsg->getService()); if( lst != NULL ) { send_pong = lst->onPing(overlayMsg->getSourceNode()); } // send pong message if allowed if ( send_pong ) { OverlayMsg pong_msg(OverlayMsg::typePong); pong_msg.setSeqNum(overlayMsg->getSeqNum()); // send message try { send_node( &pong_msg, overlayMsg->getSourceNode(), system_priority::OVERLAY, overlayMsg->getService() ); } catch ( message_not_sent& e ) { logging_info("Could not send Pong-Message to node: " << overlayMsg->getSourceNode()); } } } bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { // inform application CommunicationListener* lst = getListener(overlayMsg->getService()); if( lst != NULL ) { lst->onPong(overlayMsg->getSourceNode()); } } bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { if( ld == NULL ) { logging_warn( "received overlay update message for link for " << "which we have no mapping" ); return false; } logging_info("Received type update message on link " << ld ); // update our link mapping information for this link bool changed = ( ld->remoteNode != overlayMsg->getSourceNode() ) || ( ld->service != overlayMsg->getService() ); // set parameters ld->up = true; ld->remoteNode = overlayMsg->getSourceNode(); ld->remoteLink = overlayMsg->getSourceLink(); ld->service = overlayMsg->getService(); ld->autolink = overlayMsg->isAutoLink(); // if our link information changed, we send out an update, too if( changed ) { overlayMsg->swapRoles(); overlayMsg->setSourceNode(nodeId); overlayMsg->setSourceLink(ld->overlayId); overlayMsg->setService(ld->service); send( overlayMsg, ld, system_priority::OVERLAY ); } // service registered? no-> error! if( !communicationListeners.contains( ld->service ) ) { logging_warn( "Link up: event listener has not been registered" ); return false; } // default or no service registered? CommunicationListener* listener = communicationListeners.get( ld->service ); if( listener == NULL || listener == &CommunicationListener::DEFAULT ) { logging_warn("Link up: event listener is default or null!" ); return true; } // update descriptor ld->listener = listener; ld->setAutoUsed(); ld->setAlive(); // ask the service whether it wants to accept this link if( !listener->onLinkRequest(ld->remoteNode) ) { logging_debug("Link id=" << ld->overlayId.toString() << " has been denied by service " << ld->service.toString() << ", dropping link"); // prevent onLinkDown calls to the service ld->listener = &CommunicationListener::DEFAULT; // drop the link dropLink( ld->overlayId ); return true; } // set link up ld->up = true; logging_info( "Link has been accepted by service and is up: " << ld ); // auto links: link has been accepted -> send queued messages if( ld->messageQueue.size() > 0 ) { logging_info( "Sending out queued messages on link " << ld ); foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue ) { sendMessage( msg.message, ld->overlayId, msg.priority ); } ld->messageQueue.clear(); } // call the notification functions listener->onLinkUp( ld->overlayId, ld->remoteNode ); sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId ); return true; } /// handle a link request and reply bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { //TODO: Check if a request has already been sent using getSourceLink() ... // create link descriptor LinkDescriptor* ldn = addDescriptor(); // flags ldn->up = true; ldn->fromRemote = true; ldn->relayed = true; // parameters ldn->service = overlayMsg->getService(); ldn->listener = getListener(ldn->service); ldn->remoteNode = overlayMsg->getSourceNode(); ldn->remoteLink = overlayMsg->getSourceLink(); ldn->hops = overlayMsg->getNumHops(); // initialize sequence numbers ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short(); logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum); // update time-stamps ldn->setAlive(); ldn->setAutoUsed(); logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() << " LINK: " << ldn); // create reply message and send back! overlayMsg->swapRoles(); // swap source/destination overlayMsg->setType(OverlayMsg::typeLinkReply); overlayMsg->setSourceLink(ldn->overlayId); overlayMsg->setRelayed(true); // overlayMsg->setRouteRecord(true); overlayMsg->setSeqNum(ld->last_sent_seqnum); // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!! // append our endpoints (for creation of a direct link) overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize()); send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link // inform listener if(ldn != NULL && ldn->listener != NULL) ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); return true; } bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, reboost::shared_buffer_t sub_message, LinkDescriptor* ld ) { // deserialize EndpointDescriptor EndpointDescriptor endpoints; endpoints.deserialize(sub_message); // find link request LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink()); // not found? yes-> drop with error! if (ldn == NULL) { logging_error( "No link request pending for " << overlayMsg->getDestinationLink().toString() ); return false; } logging_debug("Handling link reply for " << ldn ) // check if already up if (ldn->up) { logging_warn( "Link already up: " << ldn ); return true; } // debug message logging_info( "Link request reply received. Establishing link" << " for service " << overlayMsg->getService().toString() << " with local id=" << overlayMsg->getDestinationLink() << " and remote link id=" << overlayMsg->getSourceLink() << " to " << endpoints.toString() << " hop count: " << overlayMsg->getRouteRecord().size() ); // set local link descriptor data ldn->up = true; ldn->relayed = true; ldn->service = overlayMsg->getService(); ldn->listener = getListener(ldn->service); ldn->remoteLink = overlayMsg->getSourceLink(); ldn->remoteNode = overlayMsg->getSourceNode(); // update timestamps ldn->setAlive(); ldn->setAutoUsed(); // auto links: link has been accepted -> send queued messages if( ldn->messageQueue.size() > 0 ) { logging_info( "Sending out queued messages on link " << ldn->overlayId.toString() ); foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue ) { sendMessage( msg.message, ldn->overlayId, msg.priority ); } ldn->messageQueue.clear(); } // inform listeners about new link ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); // try to replace relay link with direct link ldn->retryCounter = 3; ldn->endpoint = endpoints; ldn->communicationId = bc->establishLink( ldn->endpoint ); return true; } /// handle a keep-alive message for a link bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink()); if ( rld != NULL ) { logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() ); if (overlayMsg->isRouteRecord()) { rld->routeRecord = overlayMsg->getRouteRecord(); } // set alive rld->setAlive(); /* answer keep alive */ if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive ) { time_t now = time(NULL); logging_debug("[BaseOverlay] Answering KeepAlive over " << ld->to_string() << " after " << difftime( now, ld->keepAliveSent ) << "s"); OverlayMsg msg( OverlayMsg::typeKeepAliveReply, OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); msg.setRouteRecord(true); ld->keepAliveSent = now; send_link( &msg, ld->overlayId, system_priority::OVERLAY ); } return true; } else { logging_error("No Keep-Alive for " << overlayMsg->getDestinationLink() << ": link unknown." ); return false; } } /// handle a direct link message bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { logging_debug( "Received direct link replacement request" ); /// get destination overlay link LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() ); if (rld == NULL || ld == NULL) { logging_error("Direct link replacement: Link " << overlayMsg->getDestinationLink() << "not found error." ); return false; } logging_info( "Received direct link convert notification for " << rld ); // update information rld->communicationId = ld->communicationId; rld->communicationUp = true; rld->relayed = false; // mark used and alive! rld->setAlive(); rld->setAutoUsed(); // erase the original descriptor eraseDescriptor(ld->overlayId); // inform listener if( rld->listener != NULL) rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode ); return true; } /// handles an incoming message bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld, const LinkID bcLink ) { // decapsulate overlay message OverlayMsg* overlayMsg = new OverlayMsg(); reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message); // // XXX debug // logging_info( "Received overlay message." // << " Hops: " << (int) overlayMsg->getNumHops() // << " Type: " << (int) overlayMsg->getType() // << " Payload size: " << sub_buff.size() // << " SeqNum: " << overlayMsg->getSeqNum() ); // increase number of hops overlayMsg->increaseNumHops(); // refresh relay information refreshRelayInformation( overlayMsg, ld ); // update route record overlayMsg->addRouteRecord(nodeId); // handle signaling messages (do not route!) if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart && overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) { overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); delete overlayMsg; return true; } // message for reached destination? no-> route message if (!overlayMsg->getDestinationNode().isUnspecified() && overlayMsg->getDestinationNode() != nodeId ) { logging_debug("Routing message " << " from " << overlayMsg->getSourceNode() << " to " << overlayMsg->getDestinationNode() ); // // XXX testing AKTUELL // logging_info("MARIO: Routing message " // << " from " << overlayMsg->getSourceNode() // << " to " << overlayMsg->getDestinationNode() ); // logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size()); overlayMsg->append_buffer(sub_buff); route( overlayMsg, ld->remoteNode ); delete overlayMsg; return true; } /* handle base overlay message */ bool ret = false; // return value try { switch ( overlayMsg->getType() ) { // data transport messages case OverlayMsg::typeData: { // NOTE: On relayed links, »ld« does not point to our link, but on the relay link. LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink()); if ( ! end_to_end_ld ) { logging_warn("Error: Data-Message claims to belong to a link we don't know."); ret = false; } else { // message received --> link is alive end_to_end_ld->keepAliveReceived = time(NULL); // hop count on this link end_to_end_ld->hops = overlayMsg->getNumHops(); // * call handler * ret = handleData(sub_buff, overlayMsg, end_to_end_ld); } break; } case OverlayMsg::typeMessageLost: ret = handleLostMessage(sub_buff, overlayMsg); break; // overlay setup messages case OverlayMsg::typeJoinRequest: ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink ); break; case OverlayMsg::typeJoinReply: ret = handleJoinReply(sub_buff, bcLink ); break; // link specific messages case OverlayMsg::typeLinkRequest: ret = handleLinkRequest(overlayMsg, ld ); break; case OverlayMsg::typeLinkReply: ret = handleLinkReply(overlayMsg, sub_buff, ld ); break; case OverlayMsg::typeLinkUpdate: ret = handleLinkUpdate(overlayMsg, ld ); break; case OverlayMsg::typeKeepAlive: case OverlayMsg::typeKeepAliveReply: ret = handleLinkAlive(overlayMsg, ld ); break; case OverlayMsg::typeLinkDirect: ret = handleLinkDirect(overlayMsg, ld ); break; case OverlayMsg::typeLinkClose: { dropLink(overlayMsg->getDestinationLink()); __removeDroppedLink(overlayMsg->getDestinationLink()); break; } /// ping over overlay path (or similar) case OverlayMsg::typePing: { ret = handlePing(overlayMsg, ld); break; } case OverlayMsg::typePong: { ret = handlePong(overlayMsg, ld); break; } // handle unknown message type default: { logging_error( "received message in invalid state! don't know " << "what to do with this message of type " << overlayMsg->getType() ); ret = false; break; } } } catch ( reboost::illegal_sub_buffer& e ) { logging_error( "Failed to create sub-buffer while reading message: »" << e.what() << "« Message too short? "); assert(false); // XXX } // free overlay message and return value delete overlayMsg; return ret; } // ---------------------------------------------------------------------------- void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) { logging_debug( "broadcasting message to all known nodes " << "in the overlay from service " + service.toString() ); OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true); for(size_t i=0; inodeId) continue; // don't send to ourselfs sendMessage( message, id, priority, service ); } } /// return the overlay neighbors vector BaseOverlay::getOverlayNeighbors(bool deep) const { // the known nodes _can_ also include our node, so we remove ourself vector nodes = overlayInterface->getKnownNodes(deep); vector::iterator i = find( nodes.begin(), nodes.end(), this->nodeId ); if( i != nodes.end() ) nodes.erase( i ); return nodes; } const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const { if( lid == LinkID::UNSPECIFIED ) return nodeId; const LinkDescriptor* ld = getDescriptor(lid); if( ld == NULL ) return NodeID::UNSPECIFIED; else return ld->remoteNode; } vector BaseOverlay::getLinkIDs( const NodeID& nid ) const { vector linkvector; foreach( LinkDescriptor* ld, links ) { if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) { linkvector.push_back( ld->overlayId ); } } return linkvector; } void BaseOverlay::onNodeJoin(const NodeID& node) { JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node ); if( i == joiningNodes.end() ) return; logging_info( "node has successfully joined baseoverlay and overlay structure " << node.toString() ); joiningNodes.erase( i ); } void BaseOverlay::eventFunction() { stabilizeRelays(); stabilizeLinks(); updateVisual(); } /* link status */ bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const { const LinkDescriptor* ld = getDescriptor(lnk); if (!ld) return false; return ld->communicationUp && !ld->relayed; } int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const { const LinkDescriptor* ld = getDescriptor(lnk); if (!ld) return -1; return ld->hops; } bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const { time_t now = time(NULL); return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..? } bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const { return isLinkVital(link) && link->communicationUp && !link->relayed; } /* [link status] */ void BaseOverlay::updateVisual(){ // // update base overlay structure // static NodeID pre = NodeID::UNSPECIFIED; static NodeID suc = NodeID::UNSPECIFIED; vector nodes = this->getOverlayNeighbors(false); if(nodes.size() == 0){ if(pre != NodeID::UNSPECIFIED){ visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, ""); pre = NodeID::UNSPECIFIED; } if(suc != NodeID::UNSPECIFIED){ visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, ""); suc = NodeID::UNSPECIFIED; } } // if(nodes.size() == 0) if(nodes.size() == 1){ // only one node, make this pre and succ // and then go into the node.size()==2 case //nodes.push_back(nodes.at(0)); if(pre != nodes.at(0)){ pre = nodes.at(0); if(pre != NodeID::UNSPECIFIED) visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, ""); } } if(nodes.size() == 2){ // old finger if(nodes.at(0) != pre){ if(pre != NodeID::UNSPECIFIED) visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, ""); pre = NodeID::UNSPECIFIED; } if(nodes.at(1) != suc){ if(suc != NodeID::UNSPECIFIED) visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, ""); suc = NodeID::UNSPECIFIED; } // connect with fingers if(pre == NodeID::UNSPECIFIED){ pre = nodes.at(0); if(pre != NodeID::UNSPECIFIED) visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, ""); } if(suc == NodeID::UNSPECIFIED){ suc = nodes.at(1); if(suc != NodeID::UNSPECIFIED) visualInstance.visConnect(visualIdOverlay, this->nodeId, suc, ""); } } //if(nodes.size() == 2) // { // logging_error("================================"); // logging_error("my nodeid " << nodeId.get(MAX_KEYLENGTH-16, 16)); // logging_error("================================"); // if(nodes.size()>= 1){ // logging_error("real pre " << nodes.at(0).toString()); // logging_error("real pre " << nodes.at(0).get(MAX_KEYLENGTH-16, 16)); // } // if(nodes.size()>= 2){ // logging_error("real suc " << nodes.at(1).toString()); // logging_error("real suc " << nodes.at(1).get(MAX_KEYLENGTH-16, 16)); // } // logging_error("================================"); // if(pre == NodeID::UNSPECIFIED){ // logging_error("pre: unspecified"); // }else{ // unsigned int prei = pre.get(MAX_KEYLENGTH-16, 16); // logging_error("pre: " << prei); // } // if(suc == NodeID::UNSPECIFIED){ // logging_error("suc: unspecified"); // }else{ // unsigned int suci = suc.get(MAX_KEYLENGTH-16, 16); // logging_error("suc: " << suci); // } // logging_error("================================"); // } // // update base communication links // static set linkset; set remotenodes; foreach( LinkDescriptor* ld, links ) { if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue; if (ld->routeRecord.size()>1 && ld->relayed) { for (size_t i=1; irouteRecord.size(); i++) remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] ); } else { remotenodes.insert(ld->remoteNode); } } // which links are old and need deletion? bool changed = false; do{ changed = false; foreach(NodeID n, linkset){ if(remotenodes.find(n) == remotenodes.end()){ visualInstance.visDisconnect(visualIdBase, this->nodeId, n, ""); linkset.erase(n); changed = true; break; } } }while(changed); // which links are new and need creation? do{ changed = false; foreach(NodeID n, remotenodes){ if(linkset.find(n) == linkset.end()){ visualInstance.visConnect(visualIdBase, this->nodeId, n, ""); linkset.insert(n); changed = true; break; } } }while(changed); } // ---------------------------------------------------------------------------- std::string BaseOverlay::debugInformation() { std::stringstream s; int i=0; // dump overlay information s << "Long debug info ... [see below]" << endl << endl; s << "--- overlay information ----------------------" << endl; s << overlayInterface->debugInformation() << endl; // dump link state s << "--- link state -------------------------------" << endl; foreach( LinkDescriptor* ld, links ) { s << "link " << i << ": " << ld << endl; i++; } s << endl << endl; return s.str(); } }} // namespace ariba, overlay