// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "BaseOverlay.h" #include "ariba/utility/misc/OvlVis.h" #include "ariba/NodeListener.h" #include "ariba/CommunicationListener.h" #include "ariba/SideportListener.h" #include "ariba/overlay/messages/OverlayMsg.h" #include "ariba/overlay/messages/JoinRequest.h" #include "ariba/overlay/messages/JoinReply.h" #include "ariba/overlay/messages/LinkRequest.h" namespace ariba { namespace overlay { use_logging_cpp(BaseOverlay); BaseOverlay::BaseOverlay() : bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED), state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT){ } BaseOverlay::~BaseOverlay(){ } void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ){ bc = &_basecomm; nodeId = _nodeid; logging_info("creating base overlay"); bc->registerMessageReceiver( this ); bc->registerEventListener( this ); ovl.visCreate( ovlId, nodeId, string(""), string("") ); ovl.visChangeNodeColor(ovlId, nodeId, OvlVis::NODE_COLORS_GREY); // if (Identifier(Configuration::instance().read("BASE_nodeid")) == // Identifier(Configuration::instance().read("SOURCE"))) { // ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CAMERA); // } else if (Identifier(Configuration::instance().read("BASE_nodeid")) == // Identifier(Configuration::instance().read("MR_A"))) { // ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_A); // } else if (Identifier(Configuration::instance().read("BASE_nodeid")) == // Identifier(Configuration::instance().read("MR_W"))) { // ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_W); // } // timer for auto link management Timer::setInterval( 5000 ); Timer::start(); } void BaseOverlay::stop() { logging_info("deleting base overlay"); Timer::stop(); bc->unregisterMessageReceiver( this ); bc->unregisterEventListener( this ); if(overlayInterface != NULL){ delete overlayInterface; overlayInterface = NULL; } } void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp){ ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." ); logging_info( "starting to join spovnet " << id.toString() << " with nodeid " << nodeId.toString()); // // contact the spovnet initiator and request // to join. if the join is granted we will // receive further information on the structure // of the overlay that is used in the spovnet // // but first, we have to establish a link to the initiator... // spovnetId = id; state = BaseOverlayStateJoinInitiated; initiatorLink = bc->establishLink( bootstrapEp ); logging_info("join process initiated for " << id.toString() << "..."); } void BaseOverlay::leaveSpoVNet(){ logging_info( "leaving spovnet " << spovnetId ); bool ret = ( state != this->BaseOverlayStateInvalid ); logging_debug( "dropping all auto-links ..." ); // now we start leaving the spovnet: fist delete all links // that we still have in the baseoverlay initiated by // some services, the leave the actual overlay structure, // then leave the spovnet // --> drop all service links vector servicelinks; BOOST_FOREACH( LinkPair item, linkMapping ){ if( item.second.service != OverlayInterface::OVERLAY_SERVICE_ID ) servicelinks.push_back( item.first ); } BOOST_FOREACH( LinkID lnk, servicelinks ){ // the dropLink function will remove // the item from the linkMapping dropLink( lnk ); } // --> leave overlay structure logging_debug( "leaving overlay" ); // first, leave the overlay interface if( overlayInterface != NULL ) overlayInterface->leaveOverlay(); // --> leave spovnet if( state != BaseOverlayStateInitiator ){ // then, leave the spovnet baseoverlay OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeBye, nodeId ); bc->sendMessage( initiatorLink, &overMsg ); // drop the link and set to correct state bc->dropLink( initiatorLink ); initiatorLink = LinkID::UNSPECIFIED; } state = BaseOverlayStateInvalid; ovl.visShutdown( ovlId, nodeId, string("") ); // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ){ if( ret ) i->onLeaveCompleted( spovnetId ); else i->onLeaveFailed( spovnetId ); } } void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos){ // set the state that we are an initiator, this way incoming messages are // handled correctly logging_info( "creating spovnet " + id.toString() << " with nodeid " << nodeId.toString() ); spovnetId = id; state = BaseOverlayStateInitiator; overlayInterface = OverlayFactory::create( *this, param, nodeId, this ); if( overlayInterface == NULL ){ logging_fatal( "overlay structure not supported" ); state = BaseOverlayStateInvalid; return; } // bootstrap against ourselfs overlayInterface->joinOverlay(); BOOST_FOREACH( NodeListener* i, nodeListeners ) i->onJoinCompleted( spovnetId ); ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA ); ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); } /// establishes a link between two arbitrary nodes const LinkID BaseOverlay::establishLink( const NodeID& node, const ServiceID& service, const LinkID& link_id ) { if( !communicationListeners.contains( service ) ){ logging_error( "no registered listener on serviceid " << service.toString() ); return LinkID::UNSPECIFIED; } // copy link id LinkID linkid = link_id; // create link id if necessary if (linkid.isUnspecified()) linkid = LinkID::create(); // debug message logging_debug( "BaseOverlay called to establish link between node " << node.toString() << " for service " << service.toString() ); // create link request message with own link id OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId ); uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL)); LinkRequest link_request_msg( nonce, &bc->getEndpointDescriptor() ); overlay_msg.encapsulate( &link_request_msg ); pendingLinks.insert( make_pair(nonce, linkid) ); // debug message logging_debug( "BaseOverlay routes LinkRequest message to node " << node.toString() ); // route message to overlay node overlayInterface->routeMessage( node, &overlay_msg ); CommunicationListener* receiver = communicationListeners.get( service ); assert( receiver != NULL ); LinkItem item (linkid, NodeID::UNSPECIFIED, service, receiver); linkMapping.insert( make_pair(linkid, item) ); return linkid; } const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep, const ServiceID& service, const LinkID& linkid ){ if( !communicationListeners.contains( service ) ){ logging_error( "no registered listener on serviceid " << service.toString() ); return LinkID::UNSPECIFIED; } const LinkID link = bc->establishLink( ep, linkid ); CommunicationListener* receiver = communicationListeners.get( service ); assert( receiver != NULL ); LinkItem item (link, NodeID::UNSPECIFIED, service, receiver); linkMapping.insert( make_pair(link, item) ); return link; } void BaseOverlay::dropLink(const LinkID& link){ logging_debug( "baseoverlay dropping link " << link.toString() ); LinkMapping::iterator i = linkMapping.find( link ); // find the link item to drop if( i == linkMapping.end() ){ logging_warn( "can't drop link, mapping unknown " << link.toString() ); return; } LinkItem item = i->second; // delete all queued messages if( item.waitingmsg.size() > 0 ){ logging_warn( "dropping link " << link.toString() << " that has " << item.waitingmsg.size() << " waiting messages" ); item.deleteWaiting(); } // erase the mapping and drop the link linkMapping.erase( i ); bc->dropLink( link ); // tell sideports and listeners of the drop item.interface->onLinkDown( link, item.node ); sideport->onLinkDown(link, this->nodeId, item.node, this->spovnetId ); } seqnum_t BaseOverlay::sendMessage(const Message* message, const LinkID& link ){ logging_debug( "baseoverlay is sending data message on link " << link.toString() ); // // get the mapping for this link // LinkMapping::iterator i = linkMapping.find( link ); if( i == linkMapping.end() ){ logging_error( "could not send message. link not found " << link.toString() ); return -1; } i->second.markused(); // // check if the link is up yet, if its an autlink queue message // if( !i->second.linkup ){ if( i->second.autolink ){ logging_info( "auto link " << link.toString() << " is not up yet, queueing message" ); Data data = data_serialize( message ); const_cast(message)->dropPayload(); i->second.waitingmsg.push_back( new Message(data) ); } else { logging_error("link " << link.toString() << " is not up yet, dropping message" ); } return -1; } // // send the message through the basecomm // OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId ); overmsg.encapsulate( const_cast(message) ); return bc->sendMessage( link, &overmsg ); } seqnum_t BaseOverlay::sendMessage(const Message* message, const NodeID& node, const ServiceID& service){ LinkID link = LinkID::UNSPECIFIED; LinkMapping::iterator i = linkMapping.begin(); LinkMapping::iterator iend = linkMapping.end(); // // see if we find a link for this node and service destination // for( ; i != iend; i++ ){ if( i->second.node == node && i->second.service == service ){ link = i->second.link; break; } } // // if we found no link, create an auto link // if( link == LinkID::UNSPECIFIED ){ logging_info( "no link could be found to send message to node " << node.toString() << " for service " << service.toString() << ". creating auto link ..."); // call basecomm to create a link link = establishLink( node, service ); // this will call onlinkup on us, if everything worked we now have a mapping LinkMapping::iterator i = linkMapping.find( link ); i->second.autolink = true; if( i == linkMapping.end() || link == LinkID::UNSPECIFIED ){ logging_error( "failed to establish auto link to node " << node.toString() << " for service " << service.toString() ); return -1; } logging_debug( "establishing autolink in progress to node " << node.toString() << " with new link-id " << link.toString() ); } // if( link != LinkID::UNSPECIFIED ) assert( link != LinkID::UNSPECIFIED ); // mark the link as used, as we // now send a message through it i->second.markused(); // send the message through the new link. the link may not be functional, // but for us there is a link-id so we can send messages through it. if // the link is not yet up and the message needs to be cached, this is the // task of the BaseCommunication, it will cache and send it later. return sendMessage( message, link ); } const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const LinkID& link) const { return bc->getEndpointDescriptor( link ); } const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const NodeID& node) const { if( node == nodeId || node == NodeID::UNSPECIFIED ) return bc->getEndpointDescriptor(); if( overlayInterface == NULL ){ logging_error( "overlay interface not set, cannot resolve endpoint" ); return EndpointDescriptor::UNSPECIFIED; } // TODO: if this is not a onehop overlay the operation will go asynchronously return overlayInterface->resolveNode( node ); } bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid){ logging_debug( "binding communication listener " << listener << " on serviceid " << sid.toString() ); if( communicationListeners.contains( sid ) ){ logging_error( "some listener already registered for service id " << sid.toString() ); return false; } communicationListeners.registerItem( listener, sid ); return true; } bool BaseOverlay::registerSidePort(SideportListener* _sideport){ sideport = _sideport; _sideport->configure( this ); } bool BaseOverlay::unregisterSidePort(SideportListener* _sideport){ sideport = &SideportListener::DEFAULT; } bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid){ logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() ); if( !communicationListeners.contains( sid ) ){ logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() ); return false; } if( communicationListeners.get(sid) != listener ){ logging_warn( "listener bound to service id " << sid.toString() << " is different than listener trying to unbind" ); return false; } communicationListeners.unregisterItem( sid ); return true; } bool BaseOverlay::bind(NodeListener* listener){ logging_debug( "binding node listener " << listener ); NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener ); if( i != nodeListeners.end() ){ logging_warn( "node listener " << listener << " is already bound, cannot bind" ); return false; } nodeListeners.push_back( listener ); return true; } bool BaseOverlay::unbind(NodeListener* listener){ logging_debug( "unbinding node listener " << listener ); NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener ); if( i == nodeListeners.end() ){ logging_warn( "node listener " << listener << " is not bound, cannot unbind" ); return false; } nodeListeners.erase( i ); return true; } void BaseOverlay::onLinkUp(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){ logging_debug( "base overlay received linkup event " + id.toString() ); // TODO: updateOvlVis( getNodeID(id) ); // // if we get up a link while we are in the // join phase and this is the link that // we have initiated towards the spovnet owner // continue the join process by sending // a join request message through the link // if( state == BaseOverlayStateJoinInitiated && id == initiatorLink){ logging_info( "Join has been initiated by me and the link is now up. " << "sending out join request for SpoVNet " << spovnetId.toString() ); OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeJoinRequest, nodeId ); JoinRequest joinmsg( spovnetId, nodeId ); overMsg.encapsulate( &joinmsg ); state = BaseOverlayStateJoinInitiated; // state remains in JoinInitiated bc->sendMessage( id, &overMsg ); return; } // if( state == BaseOverlayStateJoinInitiated && id == initiatorLink) // // otherwise this is a link initiated by a service // then we exchange update messages to exchange the // service id and node id for the link. in this case // we should have a link mapping for this link. if // we have no link mapping this link was initiated by // the remote side. // LinkMapping::iterator i = linkMapping.find( id ); if( i == linkMapping.end() ){ LinkItem item (id, NodeID::UNSPECIFIED, ServiceID::UNSPECIFIED, &CommunicationListener::DEFAULT ); linkMapping.insert( make_pair(id, item) ); } else { logging_debug( "sending out OverlayMessageTypeUpdate" << " for service " << i->second.service.toString() << " with local node id " << nodeId.toString() << " on link " << id.toString() ); OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId ); bc->sendMessage( id, &overMsg ); i->second.markused(); } // if( i == linkMapping.end() ) // the link is only valid for the service when we receive // the OverlayMessageTypeUpdate from the remote node and // have the nodeid and serviceid for the link! } void BaseOverlay::onLinkDown(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){ logging_debug( "link went down " << id.toString() ); // // tell the service that the link went // down and remove the mapping // LinkMapping::iterator i = linkMapping.find( id ); if( i == linkMapping.end() ) { // this can also be one of the baseoverlay links that // no mapping is stored for. therefore we issue no warning. // it can also be a link that has been dropped and the // mapping is already deleted in the dropLink function. // also, the service notification is issued then in dropLink return; } i->second.interface->onLinkDown( id, i->second.node ); sideport->onLinkDown( id, this->nodeId, i->second.node, this->spovnetId ); // delete all queued messages if( i->second.waitingmsg.size() > 0 ){ logging_warn( "dropping link " << id.toString() << " that has " << i->second.waitingmsg.size() << " waiting messages" ); i->second.deleteWaiting(); } linkMapping.erase( i ); } void BaseOverlay::onLinkChanged(const LinkID& id, const NetworkLocator* oldlocal, const NetworkLocator* newlocal, const NetworkLocator* oldremote, const NetworkLocator* newremote){ logging_debug( "link changed " << id.toString() ); // // tell the service that the link changed // LinkMapping::iterator i = linkMapping.find( id ); if( i == linkMapping.end() ) return; i->second.interface->onLinkChanged( id, i->second.node ); sideport->onLinkChanged( id, this->nodeId, i->second.node, this->spovnetId ); // TODO call onLinkQoSChanged? i->second.markused(); } void BaseOverlay::onLinkFail(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){ logging_debug( "link failed " << id.toString() ); // // tell the service that the link failed // LinkMapping::iterator i = linkMapping.find( id ); if( i == linkMapping.end() ) return; i->second.interface->onLinkFail( id, i->second.node ); sideport->onLinkFail( id, this->nodeId, i->second.node, this->spovnetId ); i->second.markused(); } void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote, const QoSParameterSet& qos) { logging_debug( "link qos changed " << id.toString() ); // // tell the service that the link qos has changed // LinkMapping::iterator i = linkMapping.find( id ); if( i == linkMapping.end() ) return; // TODO: convert QoSParameterSet to the LinkProperties properties // TODO: currently not in the interface: i->second.interface->onLinkQoSChanged( id, i->second.node, LinkProperties::DEFAULT ); i->second.markused(); } bool BaseOverlay::onLinkRequest( const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote ){ // also see in the receiveMessage function. there the higher layer service // is asked whether to accept link requests, but there a basic link association is // already built up, so we know the node id logging_debug("received link request from " << remote->toString() << ", accepting"); return true; } bool BaseOverlay::receiveMessage(const Message* message, const LinkID& link, const NodeID& /*the nodeid is invalid in this case! removed var to prevent errors*/ ){ // decapsulate overlay message logging_debug( "receiveMessage: " << message->toString()); OverlayMsg* overlayMsg = const_cast(message)->decapsulate(); if( overlayMsg == NULL ) return false; // mark the link as in action LinkMapping::iterator item = linkMapping.find( link ); if( item != linkMapping.end() ) item->second.markused(); /* ************************************************************************ /* handle user date that we forward to the appropriate service using the * service id in the message. as we don't know the class of message that * the service handles, we forward it as a pure Message */ if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) { logging_debug( "baseoverlay received message of type OverlayMessageTypeData" ); const ServiceID& service = overlayMsg->getService(); CommunicationListener* serviceListener = communicationListeners.get( service ); logging_debug( "received data for service " << service.toString() ); if( serviceListener != NULL ) serviceListener->onMessage( overlayMsg, overlayMsg->getSourceNode(), link ); return true; } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) /* ************************************************************************ /* Handle spovnet instance join requests */ else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) ){ logging_debug( "baseoverlay received message of type OverlayMessageTypeJoinRequest" ); JoinRequest* joinReq = overlayMsg->decapsulate(); logging_info( "received join request for spovnet " << joinReq->getSpoVNetID().toString() ); /* make sure that the node actually wants to join * the correct spovnet id that we administrate */ if( joinReq->getSpoVNetID() != spovnetId ){ logging_error( "received join request for spovnet we don't handle " << joinReq->getSpoVNetID().toString() ); return false; } // // only if all services allow the node to join it is allowed // using the isJoinAllowed interface security policies can be // implemented by higher layer services // // TODO: here you can implement mechanisms to deny joining of a node bool allow = true; logging_info( "sending back join reply for spovnet " << spovnetId.toString() << " to node " << overlayMsg->getSourceNode().toString() << ". result: " << (allow ? "allowed" : "denied") ); joiningNodes.push_back( overlayMsg->getSourceNode() ); // // send back our spovnetid, default overlay parameters, join allow // result, and ourself as the end-point to bootstrap the overlay against // assert( overlayInterface != NULL ); OverlayParameterSet parameters = overlayInterface->getParameters(); OverlayMsg retmsg( OverlayMsg::OverlayMessageTypeJoinReply, nodeId ); JoinReply replyMsg( spovnetId, parameters, allow, getEndpointDescriptor() ); retmsg.encapsulate(&replyMsg); bc->sendMessage( link, &retmsg ); return true; } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest)) /* ************************************************************************ * handle replies to spovnet instance join requests */ else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) && state == BaseOverlayStateJoinInitiated){ logging_debug( "baseoverlay received message of type OverlayMessageTypeJoinReply"); JoinReply* replyMsg = overlayMsg->decapsulate(); logging_info( "received spovnet join reply" ); // ensure that we actually wanted to get into the spovnet whose id is // in the message if( replyMsg->getSpoVNetID() != spovnetId ){ logging_error( "received spovnet join reply for spovnet " << replyMsg->getSpoVNetID().toString() << " but we wanted to join spovnet " << spovnetId.toString() ); // state does not change here, maybe the reply does come in later return false; } // if we did not get access to the spovnet notify of the failure and // close the link to the initiator if( ! replyMsg->getJoinAllowed() ){ logging_error( "our join request has been denied" ); bc->dropLink( initiatorLink ); initiatorLink = LinkID::UNSPECIFIED; state = BaseOverlayStateInvalid; // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ){ i->onJoinFailed( spovnetId ); } return true; } logging_info( "join request has been accepted for spovnet " << spovnetId.toString() ); // if we did get access to the spovnet we try to create the overlay // structure as given in the reply message overlayInterface = OverlayFactory::create( *this, replyMsg->getParam(), nodeId, this ); if( overlayInterface == NULL ){ logging_error( "overlay structure not supported" ); bc->dropLink( initiatorLink ); initiatorLink = LinkID::UNSPECIFIED; state = BaseOverlayStateInvalid; // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ) i->onJoinFailed( spovnetId ); return true; } /* now start the join process for the overlay. the join process for the * spovnet baseoverlay is now complete. we use the endpoint for overlay * structure bootstrapping that the initiator provided in his reply * message */ state = BaseOverlayStateCompleted; ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); overlayInterface->createOverlay(); overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ){ i->onJoinCompleted( spovnetId ); } return true; } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) && state == BaseOverlayStateJoinInitiated) /* ************************************************************************ * handle update messages for link establishment */ else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ){ logging_debug( "baseoverlay received message of type OverlayMessageTypeUpdate" ); const NodeID& sourcenode = overlayMsg->getSourceNode(); const ServiceID& service = overlayMsg->getService(); // linkmapping for the link available? no-> ignore LinkMapping::iterator i = linkMapping.find( link ); if( i == linkMapping.end() ) { logging_warn( "received overlay update message for link " << link.toString() << " for which we have no mapping" ); return false; } // update our link mapping information for this link bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service ); i->second.node = sourcenode; i->second.service = service; // if our link information changed, we send out an update, too if( changed ){ OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId ); bc->sendMessage( link, &overMsg ); } // set the correct listener service for the linkitem // now we can tell the registered service of the linkup event if( !communicationListeners.contains( service ) ){ logging_warn( "linkup event for service that has not been registered" ); return false; } CommunicationListener* iface = communicationListeners.get( service ); if( iface == NULL || iface == &CommunicationListener::DEFAULT ){ logging_warn( "linkup event for service that has been registered " "with a NULL interface" ); return true; } i->second.interface = iface; i->second.markused(); // ask the service whether it wants to accept this link if( !iface->onLinkRequest(sourcenode) ){ logging_debug("link " << link.toString() << " has been denied by service " << service.toString() << ", dropping link"); // prevent onLinkDown calls to the service i->second.interface = &CommunicationListener::DEFAULT; // drop the link dropLink( link ); return true; } // // link has been accepted, link is now up, send messages out first // i->second.linkup = true; logging_debug("link " << link.toString() << " has been accepted by service " << service.toString() << " and is now up"); if( i->second.waitingmsg.size() > 0 ){ logging_info( "sending out queued messages on link " << link.toString() ); BOOST_FOREACH( Message* msg, i->second.waitingmsg ){ sendMessage( msg, link ); delete msg; } i->second.waitingmsg.clear(); } // call the notification functions iface->onLinkUp( link, sourcenode ); sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId ); return true; } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ) /* ************************************************************************ * handle bye messages */ else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye) ) { logging_debug( "BaseOverlay received message of type OverlayMessageTypeBye" ); logging_debug( "Received bye message from " << overlayMsg->getSourceNode().toString() ); /* if we are the initiator and receive a bye from a node * the node just left. if we are a node and receive a bye * from the initiator, we have to close, too. */ if( overlayMsg->getSourceNode() == spovnetInitiator ){ bc->dropLink( initiatorLink ); initiatorLink = LinkID::UNSPECIFIED; state = BaseOverlayStateInvalid; logging_fatal( "initiator ended spovnet" ); // inform all registered services of the event BOOST_FOREACH( NodeListener* i, nodeListeners ){ i->onLeaveFailed( spovnetId ); } } else { // a node that said goodbye and we are the initiator don't have to // do much here, as the node also will go out of the overlay // structure logging_info( "node left " << overlayMsg->getSourceNode() ); } return true; } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye)) /* ************************************************************************ * handle link request forwarded through the overlay */ else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest)) { LinkRequest* linkReq = overlayMsg->decapsulate(); const ServiceID& service = overlayMsg->getService(); if (linkReq->isReply()) { // find link PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() ); if ( i == pendingLinks.end() ) { logging_error( "Nonce not found in link request" ); return true; } // debug message logging_debug( "LinkRequest reply received. Establishing link " << i->second << " to " << (linkReq->getEndpoint()->toString()) << " for service " << service.toString() << " with nonce " << linkReq->getNonce() ); // establishing link bc->establishLink( *linkReq->getEndpoint(), i->second ); } else { OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId ); LinkRequest link_request_msg( linkReq->getNonce(), &bc->getEndpointDescriptor(), true ); overlay_msg.encapsulate( &link_request_msg ); // debug message logging_debug( "Sending LinkRequest reply for link with nonce " << linkReq->getNonce() ); // route message back over overlay overlayInterface->routeMessage( overlayMsg->getSourceNode(), &overlay_msg ); } } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest)) /* ************************************************************************ * unknown message type ... error! */ else { logging_error( "received message in invalid state! don't know " << "what to do with this message of type " << overlayMsg->getType() ); return false; } // else return false; } void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service){ logging_debug( "broadcasting message to all known nodes " << "in the overlay from service " + service.toString() ); OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(); OverlayInterface::NodeList::iterator i = nodes.begin(); OverlayInterface::NodeList::iterator iend = nodes.end(); for( ; i != iend; i++ ){ if( *i == nodeId) continue; // don't send to ourselfs sendMessage( message, *i, service ); } } vector BaseOverlay::getOverlayNeighbors() const { // the known nodes _can_ also include our // node, so we remove ourselfs vector nodes = overlayInterface->getKnownNodes(); vector::iterator i = find( nodes.begin(), nodes.end(), this->nodeId ); if( i != nodes.end() ) nodes.erase( i ); return nodes; } void BaseOverlay::updateOvlVis( const NodeID& n ) { NodeID node = n; /* void visShowNodeBubble ( NETWORK_ID network, NodeID& node, string label ); */ using namespace std; if (node == nodeId || node.isUnspecified()) return; // min/max if ( node < min || min.isUnspecified() ) min = node; if ( node > max || max.isUnspecified() ) max = node; // successor if ( succ.isUnspecified() || (node > nodeId && (succ < nodeId || (node-nodeId) < (succ-nodeId))) ) { if (!succ.isUnspecified() && node != succ) ovl.visDisconnect(ovlId, nodeId, succ, string("")); succ = node; ovl.visConnect(ovlId, nodeId, succ, string("")); } // set successor (circle-wrap) if (succ.isUnspecified() && !min.isUnspecified()) { succ = min; ovl.visConnect(ovlId, nodeId, succ, string("")); } } const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const { if( lid == LinkID::UNSPECIFIED ) return nodeId; LinkMapping::const_iterator i = linkMapping.find( lid ); if( i == linkMapping.end() ) return NodeID::UNSPECIFIED; else return i->second.node; } vector BaseOverlay::getLinkIDs( const NodeID& nid ) const { vector linkvector; BOOST_FOREACH( LinkPair item, linkMapping ){ if( item.second.node == nid || nid == NodeID::UNSPECIFIED ){ linkvector.push_back( item.second.link ); } } return linkvector; } void BaseOverlay::incomingRouteMessage(Message* msg){ // gets handled as normal data message receiveMessage( msg, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED ); } void BaseOverlay::onNodeJoin(const NodeID& node){ JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node ); if( i == joiningNodes.end() ) return; logging_info( "node has successfully joined baseoverlay and overlay structure " << node.toString() ); joiningNodes.erase( i ); } void BaseOverlay::eventFunction(){ list oldlinks; time_t now = time(NULL); // first gather all the links from linkMapping that need droppin // don't directly drop, as the dropLink function affects the // linkMapping structure that we are traversing here. // drop links after a timeout of 30s BOOST_FOREACH( LinkPair item, linkMapping ){ if( item.second.autolink && difftime(now, item.second.lastuse) > 30) oldlinks.push_back( item.first ); } BOOST_FOREACH( const LinkID lnk, oldlinks ) { logging_debug( "auto-link " << lnk.toString() << " timed out and is getting dropped" ); dropLink( lnk ); } } }} // namespace ariba, overlay