// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "BaseOverlay.h" #include #include #include #include #include "ariba/NodeListener.h" #include "ariba/CommunicationListener.h" #include "ariba/SideportListener.h" #include "ariba/overlay/LinkDescriptor.h" #include "ariba/overlay/messages/OverlayMsg.h" #include "ariba/overlay/messages/JoinRequest.h" #include "ariba/overlay/messages/JoinReply.h" #include "ariba/overlay/messages/LinkRequest.h" #include "ariba/overlay/messages/RelayMessage.h" #include "ariba/utility/misc/OvlVis.h" namespace ariba { namespace overlay { #define logging_force(x) std::cout << x << std::endl; #define logging_force1(x) std::cout << x << std::endl; LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) { BOOST_FOREACH( LinkDescriptor* lp, links ) if ((communication ? lp->communicationId : lp->overlayId) == link) return lp; return NULL; } const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const { BOOST_FOREACH( const LinkDescriptor* lp, links ) if ((communication ? lp->communicationId : lp->overlayId) == link) return lp; return NULL; } LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) { BOOST_FOREACH( LinkDescriptor* lp, links ) if (lp->autolink && lp->remoteNode == node && lp->service == service) return lp; return NULL; } void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) { for ( vector::iterator i = links.begin(); i!= links.end(); i++) { LinkDescriptor* ld = *i; if ((communication ? ld->communicationId : ld->overlayId) == link) { delete ld; links.erase(i); break; } } } LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) { LinkDescriptor* desc = getDescriptor( link ); if ( desc == NULL ) { desc = new LinkDescriptor(); desc->overlayId = link; links.push_back(desc); } return desc; } /// returns a direct link relay descriptor to the given relay node LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& relayNode ) { BOOST_FOREACH( LinkDescriptor* lp, links ) if (lp->remoteNode == relayNode && lp->service == OverlayInterface::OVERLAY_SERVICE_ID && lp->relay == false && lp->up) return lp; return NULL; } /// find a proper relay node const NodeID BaseOverlay::findRelayNode( const NodeID& id ) { LinkDescriptor* rld = NULL; NodeID relayNode = NodeID::UNSPECIFIED; // get used next hop towards node LinkID rlid = overlayInterface->getNextLinkId(id); while ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) { // get descriptor of first hop rld = getDescriptor(rlid); logging_force1( rld ); // is first hop a relay path? yes-> try to find real link! if ( rld->relay ) relayNode = getRelayDescriptor(rld->localRelay)->remoteNode; // no-> a proper relay node has been found else relayNode = rld->remoteNode; } logging_force1( "Potential relay node " << relayNode.toString() ); // do not return myself or use the node as relay node if (relayNode == nodeId) return NodeID::UNSPECIFIED; else { logging_force1( "Returning relay node " << relayNode.toString() ); return relayNode; } } /// forwards a message over relays/overlay/directly using link descriptor seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) { // directly send message if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) { logging_debug("sendMessage: Sending message via Base Communication"); return bc->sendMessage( ld->communicationId, message ); } // relay message else if ( ld->relay ) { logging_debug("sendMessage: Relaying message to node " << ld->remoteNode.toString() << " using relay " << ld->localRelay ); // get local relay link descriptor and mark as used for relaying LinkDescriptor* rld = getRelayDescriptor(ld->localRelay); if (rld==NULL) { logging_error("sendMessage: Relay descriptor for relay " << ld->localRelay.toString() << " unknown."); return -1; } rld->markAsRelay(); // create a information relay message to inform the relay about OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId ); RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId ); relayMsg.encapsulate( message ); overlay_msg.encapsulate( &relayMsg ); // route message to relay node in order to inform it! logging_debug("sendMessage: Sending message over relayed link with" << ld ); overlayInterface->routeMessage(rld->remoteNode, rld->overlayId, &overlay_msg); return 0; } // route message using overlay else { logging_error("Could not send message descriptor=" << ld ); logging_debug( "sendMessage: Routing message to node " << ld->remoteNode.toString() ); overlayInterface->routeMessage( ld->remoteNode, message ); return 0; } return -1; } /// creates a link descriptor, apply relay semantics if possible LinkDescriptor* BaseOverlay::createLinkDescriptor( const NodeID& remoteNode, const ServiceID& service, const LinkID& link_id ) { // find listener if( !communicationListeners.contains( service ) ) { logging_error( "No listener found for service " << service.toString() ); return NULL; } CommunicationListener* listener = communicationListeners.get( service ); assert( listener != NULL ); // copy link id LinkID linkid = link_id; // create link id if necessary if ( linkid.isUnspecified() ) linkid = LinkID::create(); // create relay link descriptor NodeID relayNode = findRelayNode(remoteNode); // add descriptor LinkDescriptor* ld = addDescriptor( linkid ); ld->overlayId = linkid; ld->service = service; ld->listener = listener; ld->remoteNode = remoteNode; // set relay node if available ld->relay = !relayNode.isUnspecified(); ld->localRelay = relayNode; if (!ld->relay) logging_error("No relay found!"); // debug output logging_debug( "Created link descriptor: " << ld ); return ld; } // ---------------------------------------------------------------------------- use_logging_cpp(BaseOverlay); // ---------------------------------------------------------------------------- BaseOverlay::BaseOverlay() : bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED), state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT) { } BaseOverlay::~BaseOverlay() { } // ---------------------------------------------------------------------------- void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) { logging_info("Starting..."); // set parameters bc = &_basecomm; nodeId = _nodeid; // register at base communication bc->registerMessageReceiver( this ); bc->registerEventListener( this ); // timer for auto link management Timer::setInterval( 500 ); Timer::start(); } void BaseOverlay::stop() { logging_info("Stopping..."); // stop timer Timer::stop(); // delete oberlay interface if(overlayInterface != NULL) { delete overlayInterface; overlayInterface = NULL; } // unregister at base communication bc->unregisterMessageReceiver( this ); bc->unregisterEventListener( this ); } // ---------------------------------------------------------------------------- void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp) { ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." ); logging_info( "Starting to join spovnet " << id.toString() << " with nodeid " << nodeId.toString()); // contact the spovnet initiator and request to join. if the join is granted we will // receive further information on the structure of the overlay that is used in the spovnet // but first, we have to establish a link to the initiator... spovnetId = id; state = BaseOverlayStateJoinInitiated; initiatorLink = bc->establishLink( bootstrapEp ); logging_info("join process initiated for " << id.toString() << "..."); } void BaseOverlay::leaveSpoVNet() { logging_info( "Leaving spovnet " << spovnetId ); bool ret = ( state != this->BaseOverlayStateInvalid ); logging_debug( "Dropping all auto-links" ); // gather all service links vector servicelinks; BOOST_FOREACH( LinkDescriptor* ld, links ) { if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID ) servicelinks.push_back( ld->overlayId ); } // drop all service links BOOST_FOREACH( LinkID lnk, servicelinks ) dropLink( lnk ); // let the node leave the spovnet overlay interface logging_debug( "Leaving overlay" ); if( overlayInterface != NULL ) overlayInterface->leaveOverlay(); // leave spovnet if( state != BaseOverlayStateInitiator ) { // then, leave the spovnet baseoverlay OverlayMsg overMsg( OverlayMsg::typeBye, nodeId ); bc->sendMessage( initiatorLink, &overMsg ); // drop the link and set to correct state bc->dropLink( initiatorLink ); initiatorLink = LinkID::UNSPECIFIED; } // change to inalid state state = BaseOverlayStateInvalid; ovl.visShutdown( ovlId, nodeId, string("") ); // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ) { if( ret ) i->onLeaveCompleted( spovnetId ); else i->onLeaveFailed( spovnetId ); } } void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos) { // set the state that we are an initiator, this way incoming messages are // handled correctly logging_info( "creating spovnet " + id.toString() << " with nodeid " << nodeId.toString() ); spovnetId = id; state = BaseOverlayStateInitiator; overlayInterface = OverlayFactory::create( *this, param, nodeId, this ); if( overlayInterface == NULL ) { logging_fatal( "overlay structure not supported" ); state = BaseOverlayStateInvalid; return; } // bootstrap against ourselfs overlayInterface->joinOverlay(); BOOST_FOREACH( NodeListener* i, nodeListeners ) i->onJoinCompleted( spovnetId ); ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA ); ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN ); } // ---------------------------------------------------------------------------- const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep, const NodeID& nodeid, const ServiceID& service, const LinkID& linkid ) { LinkID link_id = linkid; // establish link via overlay if (!nodeid.isUnspecified()) link_id = establishLink( nodeid, service, link_id ); // establish link directly if only ep is known if (nodeid.isUnspecified()) establishLink( ep, service, link_id ); return link_id; } /// call base communication's establish link and add link mapping const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep, const ServiceID& service, const LinkID& linkid ) { // create a new link id if necessary LinkID link_id = linkid; if (link_id.isUnspecified()) link_id = LinkID::create(); /// find a service listener if( !communicationListeners.contains( service ) ) { logging_error( "No listener registered for service id=" << service.toString() ); return LinkID::UNSPECIFIED; } CommunicationListener* listener = communicationListeners.get( service ); assert( listener != NULL ); /// establish link and add mapping logging_info("Establishing direct link " << link_id.toString() << " using " << ep.toString()); // create descriptor LinkDescriptor* ld = addDescriptor( link_id ); ld->overlayId = link_id; ld->communicationId = link_id; ld->listener = listener; ld->service = service; bc->establishLink( ep, link_id ); return link_id; } /// establishes a link between two arbitrary nodes const LinkID BaseOverlay::establishLink( const NodeID& node, const ServiceID& service, const LinkID& link_id ) { // do not establish a link to myself! if (node == nodeId) return LinkID::UNSPECIFIED; // create a link descriptor LinkDescriptor* ld = createLinkDescriptor( node, service, link_id ); // create link request message with own link id uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL)); LinkRequest link_request_msg( nonce, &bc->getEndpointDescriptor(), false, ld->overlayId, ld->localRelay ); OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId ); overlay_msg.encapsulate( &link_request_msg ); pendingLinks.insert( make_pair(nonce, ld->overlayId) ); // debug message logging_debug( "Sending link request with" << " link id=" << ld->overlayId << " node id=" << ld->remoteNode.toString() << " service id=" << ld->service.toString() << " local relay id=" << ld->localRelay.toString() << " nonce= " << nonce ); // sending message through new link sendMessage( &overlay_msg, ld ); return ld->overlayId; } /// drops an established link void BaseOverlay::dropLink(const LinkID& link) { logging_debug( "Dropping link (initiated locally):" << link.toString() ); // find the link item to drop LinkDescriptor* ld = getDescriptor(link); if( ld == NULL ) { logging_warn( "Can't drop link, link is unknown!"); return; } // delete all queued messages if( ld->messageQueue.size() > 0 ) { logging_warn( "Dropping link " << ld->overlayId.toString() << " that has " << ld->messageQueue.size() << " waiting messages" ); ld->flushQueue(); } // inform sideport and listener ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId ); // do not drop relay links if (!ld->usedAsRelay) { // drop the link in base communication if (ld->communicationUp) bc->dropLink( ld->communicationId ); // erase descriptor eraseDescriptor( ld->overlayId ); } else ld->dropWhenRelaysLeft = true; } // ---------------------------------------------------------------------------- /// internal send message, always use this functions to send messages over links seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) { logging_debug( "Sending data message on link " << link.toString() ); // get the mapping for this link LinkDescriptor* ld = getDescriptor(link); if( ld == NULL ) { logging_error("Could not send message. " << "Link not found id=" << link.toString()); return -1; } // check if the link is up yet, if its an auto link queue message if( !ld->up ) { ld->markAsUsed(); if( ld->autolink ) { logging_info("Auto-link " << link.toString() << " not up, queue message"); Data data = data_serialize( message ); const_cast(message)->dropPayload(); ld->messageQueue.push_back( new Message(data) ); } else { logging_error("Link " << link.toString() << " not up, drop message"); } return -1; } // compile overlay message (has service and node id) OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId ); overmsg.encapsulate( const_cast(message) ); // send message over relay/direct/overlay return sendMessage( &overmsg, ld ); } seqnum_t BaseOverlay::sendMessage(const Message* message, const NodeID& node, const ServiceID& service) { // find link for node and service LinkDescriptor* ld = getAutoDescriptor( node, service ); // if we found no link, create an auto link if( ld == NULL ) { // debug output logging_info( "No link to send message to node " << node.toString() << " found for service " << service.toString() << ". Creating auto link ..." ); // this will call onlinkup on us, if everything worked we now have a mapping LinkID link = LinkID::create(); // call base overlay to create a link link = establishLink( node, service, link ); ld = getDescriptor( link ); if( ld == NULL ) { logging_error( "Failed to establish auto-link."); return -1; } ld->autolink = true; logging_debug( "Auto-link establishment in progress to node " << node.toString() << " with link id=" << link.toString() ); } assert(ld != NULL); // mark the link as used, as we now send a message through it ld->markAsUsed(); // send / queue message return sendMessage( message, ld->overlayId ); } // ---------------------------------------------------------------------------- const EndpointDescriptor& BaseOverlay::getEndpointDescriptor( const LinkID& link) const { // return own end-point descriptor if( link == LinkID::UNSPECIFIED ) return bc->getEndpointDescriptor(); // find link descriptor. not found -> return unspecified const LinkDescriptor* ld = getDescriptor(link); if (ld==NULL) return EndpointDescriptor::UNSPECIFIED; // return endpoint-descriptor from base communication return bc->getEndpointDescriptor( ld->communicationId ); } const EndpointDescriptor& BaseOverlay::getEndpointDescriptor( const NodeID& node) const { // return own end-point descriptor if( node == nodeId || node == NodeID::UNSPECIFIED ) return bc->getEndpointDescriptor(); // no joined and request remote descriptor? -> fail! if( overlayInterface == NULL ) { logging_error( "overlay interface not set, cannot resolve endpoint" ); return EndpointDescriptor::UNSPECIFIED; } // resolve end-point descriptor from the base-overlay routing table return overlayInterface->resolveNode( node ); } // ---------------------------------------------------------------------------- bool BaseOverlay::registerSidePort(SideportListener* _sideport) { sideport = _sideport; _sideport->configure( this ); } bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) { sideport = &SideportListener::DEFAULT; } // ---------------------------------------------------------------------------- bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) { logging_debug( "binding communication listener " << listener << " on serviceid " << sid.toString() ); if( communicationListeners.contains( sid ) ) { logging_error( "some listener already registered for service id " << sid.toString() ); return false; } communicationListeners.registerItem( listener, sid ); return true; } bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) { logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() ); if( !communicationListeners.contains( sid ) ) { logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() ); return false; } if( communicationListeners.get(sid) != listener ) { logging_warn( "listener bound to service id " << sid.toString() << " is different than listener trying to unbind" ); return false; } communicationListeners.unregisterItem( sid ); return true; } // ---------------------------------------------------------------------------- bool BaseOverlay::bind(NodeListener* listener) { logging_debug( "Binding node listener " << listener ); // already bound? yes-> warning NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener ); if( i != nodeListeners.end() ) { logging_warn("Node listener " << listener << " is already bound!" ); return false; } // no-> add nodeListeners.push_back( listener ); return true; } bool BaseOverlay::unbind(NodeListener* listener) { logging_debug( "Unbinding node listener " << listener ); // already unbound? yes-> warning NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener ); if( i == nodeListeners.end() ) { logging_warn( "Node listener " << listener << " is not bound!" ); return false; } // no-> remove nodeListeners.erase( i ); return true; } // ---------------------------------------------------------------------------- void BaseOverlay::onLinkUp(const LinkID& id, const address_v* local, const address_v* remote) { logging_debug( "Link up with base communication link id=" << id ); // get descriptor for link LinkDescriptor* ld = getDescriptor(id, true); // handle initiator link if(state == BaseOverlayStateJoinInitiated && id == initiatorLink) { logging_info( "Join has been initiated by me and the link is now up. " << "Sending out join request for SpoVNet " << spovnetId.toString() ); // send join request message OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId ); JoinRequest joinRequest( spovnetId, nodeId ); overlayMsg.encapsulate( &joinRequest ); bc->sendMessage( id, &overlayMsg ); return; } // no link found? -> link establishment from remote, add one! if (ld == NULL) { ld = addDescriptor( id ); logging_debug( "onLinkUp (remote request) descriptor: " << ld ); // update descriptor ld->fromRemote = true; ld->communicationId = id; ld->communicationUp = true; ld->markAsUsed(); // in this case, do not inform listener, since service it unknown // -> wait for update message! // link mapping found? -> send update message with node-id and service id } else { logging_debug( "onLinkUp descriptor (initiated locally):" << ld ); // note: necessary to validate the link on the remote side! logging_debug( "Sending out update" << " for service " << ld->service.toString() << " with local node id " << nodeId.toString() << " on link " << ld->overlayId.toString() ); // update descriptor ld->markAsUsed(); ld->communicationUp = true; // if link is a relayed link ->convert to direct link if (ld->relay) { logging_force( "Converting to direct link: " << ld ); ld->up = true; ld->relay = false; ld->localRelay = NodeID::UNSPECIFIED; OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId ); overMsg.setRelayLink( ld->remoteLinkId ); bc->sendMessage( ld->communicationId, &overMsg ); } // compile and send update message OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId ); overlayMsg.setAutoLink( ld->autolink ); bc->sendMessage( ld->communicationId, &overlayMsg ); } } void BaseOverlay::onLinkDown(const LinkID& id, const address_v* local, const address_v* remote) { // get descriptor for link LinkDescriptor* ld = getDescriptor(id, true); if ( ld == NULL ) return; // not found? ->ignore! logging_force( "onLinkDown descriptor: " << ld ); // inform listeners about link down ld->communicationUp = false; ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId ); // delete all queued messages (auto links) if( ld->messageQueue.size() > 0 ) { logging_warn( "Dropping link " << id.toString() << " that has " << ld->messageQueue.size() << " waiting messages" ); ld->flushQueue(); } // erase mapping eraseDescriptor(ld->overlayId); } void BaseOverlay::onLinkChanged(const LinkID& id, const address_v* oldlocal, const address_v* newlocal, const address_v* oldremote, const address_v* newremote) { // get descriptor for link LinkDescriptor* ld = getDescriptor(id, true); if ( ld == NULL ) return; // not found? ->ignore! logging_debug( "onLinkChanged descriptor: " << ld ); // inform listeners ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId ); // autolinks: refresh timestamp ld->markAsUsed(); } void BaseOverlay::onLinkFail(const LinkID& id, const address_v* local, const address_v* remote) { logging_debug( "Link fail with base communication link id=" << id ); // get descriptor for link LinkDescriptor* ld = getDescriptor(id, true); if ( ld == NULL ) return; // not found? ->ignore! logging_debug( "Link failed id=" << ld->overlayId.toString() ); // inform listeners ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); // autolinks: refresh timestamp ld->markAsUsed(); } void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local, const address_v* remote, const QoSParameterSet& qos) { logging_debug( "Link quality changed with base communication link id=" << id ); // get descriptor for link LinkDescriptor* ld = getDescriptor(id, true); if ( ld == NULL ) return; // not found? ->ignore! logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); // autolinks: refresh timestamp ld->markAsUsed(); } bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local, const address_v* remote ) { logging_debug("Accepting link request from " << remote->to_string() ); return true; } /// handles a message from base communication bool BaseOverlay::receiveMessage(const Message* message, const LinkID& link, const NodeID& ) { // get descriptor for link LinkDescriptor* ld = getDescriptor( link, true ); // link known? if (ld == NULL) { // no-> handle with unspecified params logging_debug("Received message from base communication, link descriptor unknown" ); return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED ); } else { // yes -> handle with overlay link id logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() ); return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED ); } } // ---------------------------------------------------------------------------- /// handles a message from an overlay void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) { logging_debug("Received message from overlay -- " << " link id=" << link.toString() << " node id=" << source.toString() ); handleMessage( msg, link, LinkID::UNSPECIFIED, source ); } // ---------------------------------------------------------------------------- /// handles an incoming message bool BaseOverlay::handleMessage( const Message* message, const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) { logging_debug( "Handling message: " << message->toString()); // decapsulate overlay message OverlayMsg* overlayMsg = const_cast(message)->decapsulate(); if( overlayMsg == NULL ) return false; // mark the link as in action LinkDescriptor* ld = getDescriptor(boLink); if (ld == NULL) ld = getDescriptor(bcLink, true); if (ld != NULL) { ld->markAsUsed(); ld->markAlive(); } switch ( overlayMsg->getType() ) { // --------------------------------------------------------------------- // Handle spovnet instance join requests // --------------------------------------------------------------------- case OverlayMsg::typeJoinRequest: { // decapsulate message JoinRequest* joinReq = overlayMsg->decapsulate(); logging_info( "Received join request for spovnet " << joinReq->getSpoVNetID().toString() ); // check spovnet id if( joinReq->getSpoVNetID() != spovnetId ) { logging_error( "Received join request for spovnet we don't handle " << joinReq->getSpoVNetID().toString() ); return false; } // TODO: here you can implement mechanisms to deny joining of a node bool allow = true; logging_info( "Sending join reply for spovnet " << spovnetId.toString() << " to node " << overlayMsg->getSourceNode().toString() << ". Result: " << (allow ? "allowed" : "denied") ); joiningNodes.push_back( overlayMsg->getSourceNode() ); // return overlay parameters assert( overlayInterface != NULL ); logging_debug( "Using bootstrap end-point " << getEndpointDescriptor().toString() ) OverlayParameterSet parameters = overlayInterface->getParameters(); OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId ); JoinReply replyMsg( spovnetId, parameters, allow, getEndpointDescriptor() ); retmsg.encapsulate(&replyMsg); bc->sendMessage( bcLink, &retmsg ); return true; } // --------------------------------------------------------------------- // handle replies to spovnet instance join requests // --------------------------------------------------------------------- case OverlayMsg::typeJoinReply: { // decapsulate message logging_debug("received join reply message"); JoinReply* replyMsg = overlayMsg->decapsulate(); assert(state == BaseOverlayStateJoinInitiated); // correct spovnet? if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail logging_error( "Received SpoVNet join reply for " << replyMsg->getSpoVNetID().toString() << " != " << spovnetId.toString() ); return false; } // access granted? no -> fail if( !replyMsg->getJoinAllowed() ) { logging_error( "Our join request has been denied" ); // drop initiator link bc->dropLink( initiatorLink ); initiatorLink = LinkID::UNSPECIFIED; state = BaseOverlayStateInvalid; // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ) i->onJoinFailed( spovnetId ); return true; } // access has been granted -> continue! logging_info("Join request has been accepted for spovnet " << spovnetId.toString() ); // create overlay structure from spovnet parameter set overlayInterface = OverlayFactory::create( *this, replyMsg->getParam(), nodeId, this ); // overlay structure supported? no-> fail! if( overlayInterface == NULL ) { logging_error( "overlay structure not supported" ); bc->dropLink( initiatorLink ); initiatorLink = LinkID::UNSPECIFIED; state = BaseOverlayStateInvalid; // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ) i->onJoinFailed( spovnetId ); return true; } // everything ok-> join the overlay! state = BaseOverlayStateCompleted; overlayInterface->createOverlay(); logging_debug( "Using bootstrap end-point " << replyMsg->getBootstrapEndpoint().toString() ); overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); // update ovlvis ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ) i->onJoinCompleted( spovnetId ); return true; } // --------------------------------------------------------------------- // handle data forward messages // --------------------------------------------------------------------- case OverlayMsg::typeData: { // get service const ServiceID& service = overlayMsg->getService(); logging_debug( "received data for service " << service.toString() ); // find listener CommunicationListener* listener = communicationListeners.get( service ); if( listener == NULL ) return true; // delegate data message listener->onMessage( overlayMsg, overlayMsg->getSourceNode(), ld->overlayId ); return true; } // --------------------------------------------------------------------- // handle update messages for link establishment // --------------------------------------------------------------------- case OverlayMsg::typeUpdate: { logging_debug("Received type update message on link " << ld ); // get info const NodeID& sourcenode = overlayMsg->getSourceNode(); const ServiceID& service = overlayMsg->getService(); // no link descriptor available -> error! if( ld == NULL ) { logging_warn( "received overlay update message for link " << ld->overlayId.toString() << " for which we have no mapping" ); return false; } // update our link mapping information for this link bool changed = ( ld->remoteNode != sourcenode ) || ( ld->service != service ); ld->remoteNode = sourcenode; ld->service = service; ld->autolink = overlayMsg->isAutoLink(); // if our link information changed, we send out an update, too if( changed ) { OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId ); overMsg.setAutoLink(ld->autolink); bc->sendMessage( ld->communicationId, &overMsg ); } // service registered? no-> error! if( !communicationListeners.contains( service ) ) { logging_warn( "Link up: event listener has not been registered" ); return false; } // default or no service registered? CommunicationListener* listener = communicationListeners.get( service ); if( listener == NULL || listener == &CommunicationListener::DEFAULT ) { logging_warn("Link up: event listener is default or null!" ); return true; } // update descriptor ld->listener = listener; ld->markAsUsed(); ld->markAlive(); // ask the service whether it wants to accept this link if( !listener->onLinkRequest(sourcenode) ) { logging_debug("Link id=" << ld->overlayId.toString() << " has been denied by service " << service.toString() << ", dropping link"); // prevent onLinkDown calls to the service ld->listener = &CommunicationListener::DEFAULT; // drop the link dropLink( ld->overlayId ); return true; } // set link up ld->up = true; logging_debug( "Link " << ld->overlayId.toString() << " has been accepted by service " << service.toString() << " and is now up" ); // auto links: link has been accepted -> send queued messages if( ld->messageQueue.size() > 0 ) { logging_info( "sending out queued messages on link " << ld->overlayId.toString() ); BOOST_FOREACH( Message* msg, ld->messageQueue ) { sendMessage( msg, ld->overlayId ); delete msg; } ld->messageQueue.clear(); } // call the notification functions listener->onLinkUp( ld->overlayId, sourcenode ); sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId ); return true; } // --------------------------------------------------------------------- // handle bye messages // --------------------------------------------------------------------- case OverlayMsg::typeBye: { logging_debug( "received bye message from " << overlayMsg->getSourceNode().toString() ); /* if we are the initiator and receive a bye from a node * the node just left. if we are a node and receive a bye * from the initiator, we have to close, too. */ if( overlayMsg->getSourceNode() == spovnetInitiator ) { bc->dropLink( initiatorLink ); initiatorLink = LinkID::UNSPECIFIED; state = BaseOverlayStateInvalid; logging_fatal( "initiator ended spovnet" ); // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ) i->onLeaveFailed( spovnetId ); } else { // a node that said goodbye and we are the initiator don't have to // do much here, as the node also will go out of the overlay // structure logging_info( "node left " << overlayMsg->getSourceNode() ); } return true; } // --------------------------------------------------------------------- // handle link request forwarded through the overlay // --------------------------------------------------------------------- case OverlayMsg::typeLinkRequest: { // decapsulate message LinkRequest* linkReq = overlayMsg->decapsulate(); const ServiceID& service = overlayMsg->getService(); // is request reply? if ( linkReq->isReply() ) { // find link PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() ); if ( i == pendingLinks.end() ) { logging_error( "Nonce not found in link request" ); return true; } // debug message logging_debug( "Link request reply received. Establishing link " << i->second << " to " << (linkReq->getEndpoint()->toString()) << " for service " << service.toString() << " with nonce " << linkReq->getNonce() << " using relay " << linkReq->getRelay().toString() << " and remote link id=" << linkReq->getRemoteLinkId() ); // get descriptor LinkDescriptor* ldn = getDescriptor(i->second); // check if link request reply has a relay node ... if (!linkReq->getRelay().isUnspecified()) { // yes-> ldn->up = true; ldn->relay = true; if (ldn->localRelay.isUnspecified()) { logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn ); showLinkState(); } ldn->remoteRelay = linkReq->getRelay(); ldn->remoteLinkId = linkReq->getRemoteLinkId(); ldn->remoteNode = overlayMsg->getSourceNode(); ldn->markAlive(); // compile and send update message OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId ); _overlayMsg.setAutoLink(ldn->autolink); sendMessage( &_overlayMsg, ldn ); // auto links: link has been accepted -> send queued messages if( ldn->messageQueue.size() > 0 ) { logging_info( "Sending out queued messages on link " << ldn->overlayId.toString() ); BOOST_FOREACH( Message* msg, ldn->messageQueue ) { sendMessage( msg, ldn->overlayId ); delete msg; } ldn->messageQueue.clear(); } ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); // try to establish a direct link ldn->communicationId = bc->establishLink( *linkReq->getEndpoint(), i->second ); } // no relay node-> use overlay routing else { ldn->up = true; // establish direct link ldn->communicationId = bc->establishLink( *linkReq->getEndpoint(), i->second ); } } else { logging_debug( "Link request received from node id=" << overlayMsg->getSourceNode() ); // create link descriptor LinkDescriptor* ldn = createLinkDescriptor(overlayMsg->getSourceNode(), overlayMsg->getService(), LinkID::UNSPECIFIED ); assert(!ldn->overlayId.isUnspecified()); // create reply message OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId ); LinkRequest link_request_msg( linkReq->getNonce(), &bc->getEndpointDescriptor(), true, ldn->overlayId, ldn->localRelay ); overlay_msg.encapsulate( &link_request_msg ); // debug message logging_debug( "Sending LinkRequest reply for link with nonce " << linkReq->getNonce() ); // if this is a relay link-> update information & inform listeners if (!linkReq->getRelay().isUnspecified()) { // set flags ldn->up = true; ldn->relay = true; if (ldn->localRelay.isUnspecified()) { logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn ); showLinkState(); } ldn->remoteRelay = linkReq->getRelay(); ldn->remoteNode = overlayMsg->getSourceNode(); ldn->remoteLinkId = linkReq->getRemoteLinkId(); ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); } // route message back over overlay sendMessage( &overlay_msg, ldn ); } return true; } // --------------------------------------------------------------------- // handle relay message to forward messages // --------------------------------------------------------------------- case OverlayMsg::typeRelay: { // decapsulate message RelayMessage* relayMsg = overlayMsg->decapsulate(); // is relay message informative? switch (relayMsg->getType()) { // handle relay notification case RelayMessage::typeInform: { logging_info("Received relay information message with" << " relay " << relayMsg->getRelayNode() << " destination " << relayMsg->getDestNode() ); // mark incoming link as relay if (ld!=NULL) ld->markAsRelay(); // am I the destination of this message? yes-> if (relayMsg->getDestNode() == nodeId ) { // deliver relay message locally! logging_debug("Relay message reached destination. Handling the message."); handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode ); return true; } // create route message OverlayMsg _overMsg( *overlayMsg ); RelayMessage _relayMsg( *relayMsg ); _relayMsg.setType( RelayMessage::typeRoute ); _overMsg.encapsulate( &_relayMsg ); // forward message if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) { logging_info("Routing relay message to " << relayMsg->getDestNode().toString() ); overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg ); } else { logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() ); overlayInterface->routeMessage(relayMsg->getRelayNode(), &_overMsg ); } return true; } // handle relay routing case RelayMessage::typeRoute: { logging_info("Received relay route message with" << " relay " << relayMsg->getRelayNode() << " destination " << relayMsg->getDestNode() ); // mark incoming link as relay if (ld!=NULL) ld->markAsRelay(); // am I the destination of this message? yes-> if (relayMsg->getDestNode() == nodeId ) { // deliver relay message locally! logging_debug("Relay message reached destination. Handling the message."); handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode ); return true; } // am I the relay for this message? yes-> if (relayMsg->getRelayNode() == nodeId ) { logging_debug("I'm the relay for this message. Sending to destination."); OverlayMsg _overMsg( *overlayMsg ); RelayMessage _relayMsg( *relayMsg ); _overMsg.encapsulate(&_relayMsg); /// this must be handled by using relay link! overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg ); return true; } // error: I'm not a relay or destination! logging_error("This node is neither relay nor destination. Dropping Message!"); return true; } default: { logging_error("RelayMessage Unknown!"); return true; } } break; } // --------------------------------------------------------------------- // handle keep-alive messages // --------------------------------------------------------------------- case OverlayMsg::typeKeepAlive: { if ( ld != NULL ) { //logging_force("Keep-Alive for "<< ld->overlayId); ld->markAlive(); } break; } // --------------------------------------------------------------------- // handle direct link replacement messages // --------------------------------------------------------------------- case OverlayMsg::typeDirectLink: { LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() ); logging_force( "Received direct link convert notification for " << rld ); rld->communicationId = ld->communicationId; rld->communicationUp = true; rld->relay = false; rld->localRelay = NodeID::UNSPECIFIED; rld->remoteRelay = NodeID::UNSPECIFIED; eraseDescriptor(ld->overlayId); break; } // --------------------------------------------------------------------- // handle unknown message type // --------------------------------------------------------------------- default: { logging_error( "received message in invalid state! don't know " << "what to do with this message of type " << overlayMsg->getType() ); return false; } } /* switch */ return false; } // ---------------------------------------------------------------------------- void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) { logging_debug( "broadcasting message to all known nodes " << "in the overlay from service " + service.toString() ); OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(); OverlayInterface::NodeList::iterator i = nodes.begin(); for(; i != nodes.end(); i++ ) { if( *i == nodeId) continue; // don't send to ourselfs sendMessage( message, *i, service ); } } vector BaseOverlay::getOverlayNeighbors() const { // the known nodes _can_ also include our node, so we remove ourself vector nodes = overlayInterface->getKnownNodes(); vector::iterator i = find( nodes.begin(), nodes.end(), this->nodeId ); if( i != nodes.end() ) nodes.erase( i ); return nodes; } const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const { if( lid == LinkID::UNSPECIFIED ) return nodeId; const LinkDescriptor* ld = getDescriptor(lid); if( ld == NULL ) return NodeID::UNSPECIFIED; else return ld->remoteNode; } vector BaseOverlay::getLinkIDs( const NodeID& nid ) const { vector linkvector; BOOST_FOREACH( LinkDescriptor* ld, links ) { if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) { linkvector.push_back( ld->overlayId ); } } return linkvector; } void BaseOverlay::onNodeJoin(const NodeID& node) { JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node ); if( i == joiningNodes.end() ) return; logging_info( "node has successfully joined baseoverlay and overlay structure " << node.toString() ); joiningNodes.erase( i ); } void BaseOverlay::eventFunction() { // send keep-alive messages over established links BOOST_FOREACH( LinkDescriptor* ld, links ) { if (!ld->up) continue; OverlayMsg overMsg( OverlayMsg::typeKeepAlive, OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); sendMessage( &overMsg, ld ); } // iterate over all links and check for time boundaries vector oldlinks; time_t now = time(NULL); BOOST_FOREACH( LinkDescriptor* ld, links ) { // remote used as relay flag if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10) ld->usedAsRelay = false; // keep alives missed? yes-> if ( !ld->up && difftime( now, ld->keepAliveTime ) > 2 ) { // increase counter ld->keepAliveMissed++; // missed more than four keep-alive messages (4 sec)? -> drop link if (ld->keepAliveMissed > 10) { logging_force( "Link connection request is stale, closing: " << ld ); oldlinks.push_back( ld ); } } if (!ld->up) continue; // drop links that are dropped and not used as relay if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink) oldlinks.push_back( ld ); else // auto-link time exceeded? if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) oldlinks.push_back( ld ); else // keep alives missed? yes-> if ( !ld->autolink && difftime( now, ld->keepAliveTime ) > 2 ) { // increase counter ld->keepAliveMissed++; // missed more than four keep-alive messages (4 sec)? -> drop link if (ld->keepAliveMissed >= 8) { logging_force( "Link is stale, closing: " << ld ); oldlinks.push_back( ld ); } } } // show link state counter++; if (counter>=4) showLinkState(); if (counter>=4 || counter<0) counter = 0; // drop links BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) { if (!ld->communicationId.isUnspecified() && ld->communicationId == initiatorLink) { logging_force( "Not dropping initiator link: " << ld ); continue; } logging_force( "Link timed out. Dropping " << ld ); dropLink( ld->overlayId ); } } void BaseOverlay::showLinkState() { int i=0; logging_force("--- link state -------------------------------"); BOOST_FOREACH( LinkDescriptor* ld, links ) { logging_force("link " << i << ": " << ld); i++; } logging_force("----------------------------------------------"); } }} // namespace ariba, overlay