// [License]
// The Ariba-Underlay Copyright
//
// Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH)
//
// Institute of Telematics
// Universität Karlsruhe (TH)
// Zirkel 2, 76128 Karlsruhe
// Germany
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// The views and conclusions contained in the software and documentation
// are those of the authors and should not be interpreted as representing
// official policies, either expressed or implied, of the Institute of
// Telematics.
// [License]

#include "BaseOverlay.h"

#include "ariba/utility/misc/OvlVis.h"
#include "ariba/NodeListener.h"
#include "ariba/CommunicationListener.h"
#include "ariba/SideportListener.h"

#include "ariba/overlay/messages/OverlayMsg.h"
#include "ariba/overlay/messages/JoinRequest.h"
#include "ariba/overlay/messages/JoinReply.h"
#include "ariba/overlay/messages/LinkRequest.h"

namespace ariba {
namespace overlay {

use_logging_cpp(BaseOverlay);

BaseOverlay::BaseOverlay()
	: bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
		spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED),
		state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT){
}

BaseOverlay::~BaseOverlay(){
}

void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ){

	bc = &_basecomm;
	nodeId = _nodeid;

	logging_info("creating base overlay");

	bc->registerMessageReceiver( this );
	bc->registerEventListener( this );

	ovl.visCreate( ovlId, nodeId, string(""), string("") );
	ovl.visChangeNodeColor(ovlId, nodeId, OvlVis::NODE_COLORS_GREY);

// 	if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
// 	    Identifier(Configuration::instance().read<unsigned long>("SOURCE"))) {
// 		ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CAMERA);
// 	} else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
// 	    Identifier(Configuration::instance().read<unsigned long>("MR_A"))) {
// 		ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_A);
// 	} else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
// 	    Identifier(Configuration::instance().read<unsigned long>("MR_W"))) {
// 		ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_W);
// 	}

	// timer for auto link management
	Timer::setInterval( 5000 );
	Timer::start();
}

void BaseOverlay::stop() {

	logging_info("deleting base overlay");

	Timer::stop();
	bc->unregisterMessageReceiver( this );
	bc->unregisterEventListener( this );

	if(overlayInterface != NULL){
		delete overlayInterface;
		overlayInterface = NULL;
	}
}

void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp){

	ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
	logging_info( "starting to join spovnet " << id.toString() <<
			" with nodeid " << nodeId.toString());

	//
	// contact the spovnet initiator and request
	// to join. if the join is granted we will
	// receive further information on the structure
	// of the overlay that is used in the spovnet
	//
	// but first, we have to establish a link to the initiator...
	//

	spovnetId = id;
	state = BaseOverlayStateJoinInitiated;

	initiatorLink = bc->establishLink( bootstrapEp );
	logging_info("join process initiated for " << id.toString() << "...");
}

void BaseOverlay::leaveSpoVNet(){

	logging_info( "leaving spovnet " << spovnetId );
	bool ret = ( state != this->BaseOverlayStateInvalid );

	logging_debug( "dropping all auto-links ..." );

	// now we start leaving the spovnet: fist delete all links
	// that we still have in the baseoverlay initiated by
	// some services, the leave the actual overlay structure,
	// then leave the spovnet

	// --> drop all service links

	vector<LinkID> servicelinks;
	BOOST_FOREACH( LinkPair item, linkMapping ){
		if( item.second.service != OverlayInterface::OVERLAY_SERVICE_ID )
			servicelinks.push_back( item.first );
	}
	BOOST_FOREACH( LinkID lnk, servicelinks ){
		// the dropLink function will remove
		// the item from the linkMapping
		dropLink( lnk );
	}

	// --> leave overlay structure

	logging_debug( "leaving overlay" );
	// first, leave the overlay interface
	if( overlayInterface != NULL )
		overlayInterface->leaveOverlay();

	// --> leave spovnet

	if( state != BaseOverlayStateInitiator ){

		// then, leave the spovnet baseoverlay
		OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeBye, nodeId );
		bc->sendMessage( initiatorLink, &overMsg );

		// drop the link and set to correct state
		bc->dropLink( initiatorLink );
		initiatorLink = LinkID::UNSPECIFIED;
	}

	state = BaseOverlayStateInvalid;
	ovl.visShutdown( ovlId, nodeId, string("") );

	// inform all registered services of the event
	BOOST_FOREACH( NodeListener* i, nodeListeners ){
		if( ret ) i->onLeaveCompleted( spovnetId );
		else      i->onLeaveFailed( spovnetId );
	}
}

void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos){

	// set the state that we are an initiator, this way incoming messages are
	// handled correctly
	logging_info( "creating spovnet " + id.toString() <<
			" with nodeid " << nodeId.toString() );

	spovnetId = id;
	state = BaseOverlayStateInitiator;

	overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
	if( overlayInterface == NULL ){
		logging_fatal( "overlay structure not supported" );
		state = BaseOverlayStateInvalid;
		return;
	}

	// bootstrap against ourselfs
	overlayInterface->joinOverlay();
	BOOST_FOREACH( NodeListener* i, nodeListeners )
		i->onJoinCompleted( spovnetId );

	ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
	ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
}


/// establishes a link between two arbitrary nodes
const LinkID BaseOverlay::establishLink( const NodeID& node,
		const ServiceID& service, const LinkID& link_id ) {

	if( !communicationListeners.contains( service ) ){
		logging_error( "no registered listener on serviceid " << service.toString() );
		return LinkID::UNSPECIFIED;
	}

	// copy link id
	LinkID linkid = link_id;

	// create link id if necessary
	if (linkid.isUnspecified()) linkid = LinkID::create();

	// debug message
	logging_debug( "BaseOverlay called to establish link between node " <<
			node.toString() << " for service " << service.toString() );

	// create link request message with own link id
	OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId );
	uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
	LinkRequest link_request_msg( nonce, &bc->getEndpointDescriptor() );
	overlay_msg.encapsulate( &link_request_msg );
	pendingLinks.insert( make_pair(nonce, linkid) );

	// debug message
	logging_debug( "BaseOverlay routes LinkRequest message to node " << node.toString() );

	// route message to overlay node
	overlayInterface->routeMessage( node, &overlay_msg );

	CommunicationListener* receiver = communicationListeners.get( service );
	assert( receiver != NULL );

	LinkItem item (linkid, NodeID::UNSPECIFIED, service, receiver);
	linkMapping.insert( make_pair(linkid, item) );

	return linkid;
}

const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep,
		const ServiceID& service, const LinkID& linkid  ){

	if( !communicationListeners.contains( service ) ){
		logging_error( "no registered listener on serviceid " << service.toString() );
		return LinkID::UNSPECIFIED;
	}

	const LinkID link = bc->establishLink( ep, linkid );

	CommunicationListener* receiver = communicationListeners.get( service );
	assert( receiver != NULL );

	LinkItem item (link, NodeID::UNSPECIFIED, service, receiver);
	linkMapping.insert( make_pair(link, item) );

	return link;
}

void BaseOverlay::dropLink(const LinkID& link){

	logging_debug( "baseoverlay dropping link " << link.toString() );
	LinkMapping::iterator i = linkMapping.find( link );

	// find the link item to drop
	if( i == linkMapping.end() ){
		logging_warn( "can't drop link, mapping unknown " << link.toString() );
		return;
	}

	LinkItem item = i->second;

	// delete all queued messages
	if( item.waitingmsg.size() > 0 ){

		logging_warn( "dropping link " << link.toString() <<
			" that has " << item.waitingmsg.size() << " waiting messages" );

		item.deleteWaiting();
	}

	// erase the mapping and drop the link
	linkMapping.erase( i );
	bc->dropLink( link );

	// tell sideports and listeners of the drop
	item.interface->onLinkDown( link, item.node );
	sideport->onLinkDown(link, this->nodeId, item.node, this->spovnetId );
}

seqnum_t BaseOverlay::sendMessage(const Message* message, const LinkID& link ){

	logging_debug( "baseoverlay is sending data message on link " << link.toString() );

	//
	// get the mapping for this link
	//

	LinkMapping::iterator i = linkMapping.find( link );
	if( i == linkMapping.end() ){
		logging_error( "could not send message. link not found " << link.toString() );
		return -1;
	}

	i->second.markused();

	//
	// check if the link is up yet, if its an autlink queue message
	//

	if( !i->second.linkup ){

		if( i->second.autolink ){
			logging_info( "auto link " << link.toString() << " is not up yet, queueing message" );
			Data data = data_serialize( message );
			const_cast<Message*>(message)->dropPayload();
			i->second.waitingmsg.push_back( new Message(data) );
		} else {
			logging_error("link " << link.toString() << " is not up yet, dropping message" );
		}

		return -1;
	}

	//
	// send the message through the basecomm
	//

	OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData,	i->second.service, nodeId );
	overmsg.encapsulate( const_cast<Message*>(message) );

	return bc->sendMessage( link, &overmsg );
}

seqnum_t BaseOverlay::sendMessage(const Message* message, const NodeID& node, const ServiceID& service){

	LinkID link = LinkID::UNSPECIFIED;

	LinkMapping::iterator i = linkMapping.begin();
	LinkMapping::iterator iend = linkMapping.end();

	//
	// see if we find a link for this node and service destination
	//

	for( ; i != iend; i++ ){
		if( i->second.node == node && i->second.service == service ){
			link = i->second.link;
			break;
		}
	}

	//
	// if we found no link, create an auto link
	//

	if( link == LinkID::UNSPECIFIED ){

		logging_info( "no link could be found to send message to node " <<
				node.toString() << " for service " << service.toString() <<
				". creating auto link ...");

		// call basecomm to create a link
		link = establishLink( node, service );

		// this will call onlinkup on us, if everything worked we now have a mapping
		LinkMapping::iterator i = linkMapping.find( link );
		i->second.autolink = true;

		if( i == linkMapping.end() || link == LinkID::UNSPECIFIED ){
			logging_error( "failed to establish auto link to node " << node.toString() <<
					" for service " << service.toString() );
			return -1;
		}

		logging_debug( "establishing autolink in progress to node "
				<< node.toString() << " with new link-id " << link.toString() );

	} // if( link != LinkID::UNSPECIFIED )

	assert( link != LinkID::UNSPECIFIED );

	// mark the link as used, as we
	// now send a message through it
	i->second.markused();

	// send the message through the new link. the link may not be functional,
	// but for us there is a link-id so we can send messages through it. if
	// the link is not yet up and the message needs to be cached, this is the
	// task of the BaseCommunication, it will cache and send it later.
	return sendMessage( message, link );
}

const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const LinkID& link) const {

	return bc->getEndpointDescriptor( link );
}

const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const NodeID& node) const {

	if( node == nodeId || node == NodeID::UNSPECIFIED )
		return bc->getEndpointDescriptor();

	if( overlayInterface == NULL ){
		logging_error( "overlay interface not set, cannot resolve endpoint" );
		return EndpointDescriptor::UNSPECIFIED;
	}

	// TODO: if this is not a onehop overlay the operation will go asynchronously
	return overlayInterface->resolveNode( node );
}


bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid){
	logging_debug( "binding communication listener " << listener
		<< " on serviceid " << sid.toString() );

	if( communicationListeners.contains( sid ) ){
		logging_error( "some listener already registered for service id "
			<< sid.toString() );
		return false;
	}

	communicationListeners.registerItem( listener, sid );
	return true;
}

bool BaseOverlay::registerSidePort(SideportListener* _sideport){
	sideport = _sideport;
	_sideport->configure( this );
}

bool BaseOverlay::unregisterSidePort(SideportListener* _sideport){
	sideport = &SideportListener::DEFAULT;
}

bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid){
	logging_debug( "unbinding listener " << listener
		<< " from serviceid " << sid.toString() );

	if( !communicationListeners.contains( sid ) ){
		logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
		return false;
	}

	if( communicationListeners.get(sid) != listener ){
		logging_warn( "listener bound to service id " << sid.toString()
			<< " is different than listener trying to unbind" );
		return false;
	}

	communicationListeners.unregisterItem( sid );
	return true;
}

bool BaseOverlay::bind(NodeListener* listener){
	logging_debug( "binding node listener " << listener );

	NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
	if( i != nodeListeners.end() ){
		logging_warn( "node listener " << listener << " is already bound, cannot bind" );
		return false;
	}

	nodeListeners.push_back( listener );
	return true;
}

bool BaseOverlay::unbind(NodeListener* listener){
	logging_debug( "unbinding node listener " << listener );

	NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
	if( i == nodeListeners.end() ){
		logging_warn( "node listener " << listener << " is not bound, cannot unbind" );
		return false;
	}

	nodeListeners.erase( i );
	return true;
}

void BaseOverlay::onLinkUp(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){

	logging_debug( "base overlay received linkup event " + id.toString() );
	// TODO: updateOvlVis( getNodeID(id) );

	//
	// if we get up a link while we are in the
	// join phase and this is the link that
	// we have initiated towards the spovnet owner
	// continue the join process by sending
	// a join request message through the link
	//

	if( state == BaseOverlayStateJoinInitiated && id == initiatorLink){

		logging_info(
			"Join has been initiated by me and the link is now up. " <<
			"sending out join request for SpoVNet " << spovnetId.toString()
		);

		OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeJoinRequest, nodeId );
		JoinRequest joinmsg( spovnetId, nodeId );
		overMsg.encapsulate( &joinmsg );

		state = BaseOverlayStateJoinInitiated; // state remains in JoinInitiated
		bc->sendMessage( id, &overMsg );

		return;

	} // if( state == BaseOverlayStateJoinInitiated && id == initiatorLink)

	//
	// otherwise this is a link initiated by a service
	// then we exchange update messages to exchange the
	// service id and node id for the link. in this case
	// we should have a link mapping for this link. if
	// we have no link mapping this link was initiated by
	// the remote side.
	//

	LinkMapping::iterator i = linkMapping.find( id );

	if( i == linkMapping.end() ){

		LinkItem item (id, NodeID::UNSPECIFIED, ServiceID::UNSPECIFIED, &CommunicationListener::DEFAULT );
		linkMapping.insert( make_pair(id, item) );

	} else {

		logging_debug( "sending out OverlayMessageTypeUpdate" <<
				" for service " << i->second.service.toString() <<
				" with local node id " << nodeId.toString() <<
				" on link " << id.toString() );

		OverlayMsg overMsg(
			OverlayMsg::OverlayMessageTypeUpdate,
			i->second.service,
			nodeId
			);

		bc->sendMessage( id, &overMsg );
		i->second.markused();

	} // if( i == linkMapping.end() )

	// the link is only valid for the service when we receive
	// the OverlayMessageTypeUpdate from the remote node and
	// have the nodeid and serviceid for the link!
}

void BaseOverlay::onLinkDown(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){

	logging_debug( "link went down " << id.toString() );

	//
	// tell the service that the link went
	// down and remove the mapping
	//

	LinkMapping::iterator i = linkMapping.find( id );
	if( i == linkMapping.end() ) {
		// this can also be one of the baseoverlay links that
		// no mapping is stored for. therefore we issue no warning.
		// it can also be a link that has been dropped and the
		// mapping is already deleted in the dropLink function.
		// also, the service notification is issued then in dropLink
		return;
	}

	i->second.interface->onLinkDown( id, i->second.node );
	sideport->onLinkDown( id, this->nodeId, i->second.node, this->spovnetId );

	// delete all queued messages
	if( i->second.waitingmsg.size() > 0 ){

		logging_warn( "dropping link " << id.toString() <<
			" that has " << i->second.waitingmsg.size() << " waiting messages" );

		i->second.deleteWaiting();
	}

	linkMapping.erase( i );
}

void BaseOverlay::onLinkChanged(const LinkID& id, const NetworkLocator* oldlocal, const NetworkLocator* newlocal, const NetworkLocator* oldremote, const NetworkLocator* newremote){

	logging_debug( "link changed " << id.toString() );

	//
	// tell the service that the link changed
	//

	LinkMapping::iterator i = linkMapping.find( id );
	if( i == linkMapping.end() ) return;

	i->second.interface->onLinkChanged( id, i->second.node );
	sideport->onLinkChanged( id, this->nodeId, i->second.node, this->spovnetId );

	// TODO call onLinkQoSChanged?

	i->second.markused();
}

void BaseOverlay::onLinkFail(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){

	logging_debug( "link failed " << id.toString() );

	//
	// tell the service that the link failed
	//

	LinkMapping::iterator i = linkMapping.find( id );
	if( i == linkMapping.end() ) return;

	i->second.interface->onLinkFail( id, i->second.node );
	sideport->onLinkFail( id, this->nodeId, i->second.node, this->spovnetId );

	i->second.markused();
}

void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote, const QoSParameterSet& qos) {

	logging_debug( "link qos changed " << id.toString() );

	//
	// tell the service that the link qos has changed
	//

	LinkMapping::iterator i = linkMapping.find( id );
	if( i == linkMapping.end() ) return;

	// TODO: convert QoSParameterSet to the LinkProperties properties
	// TODO: currently not in the interface: i->second.interface->onLinkQoSChanged( id, i->second.node, LinkProperties::DEFAULT );

	i->second.markused();
}

bool BaseOverlay::onLinkRequest( const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote ){

	// also see in the receiveMessage function. there the higher layer service
	// is asked whether to accept link requests, but there a basic link association is
	// already built up, so we know the node id
	logging_debug("received link request from " << remote->toString() << ", accepting");
	return true;
}


bool BaseOverlay::receiveMessage(const Message* message,
	const LinkID& link, const NodeID&
	/*the nodeid is invalid in this case! removed var to prevent errors*/ ){

	// decapsulate overlay message
	logging_debug( "receiveMessage: " << message->toString());
	OverlayMsg* overlayMsg = const_cast<Message*>(message)->decapsulate<OverlayMsg>();
	if( overlayMsg == NULL ) return false;

	// mark the link as in action
	LinkMapping::iterator item = linkMapping.find( link );
	if( item != linkMapping.end() ) item->second.markused();

	/* ************************************************************************
	/* handle user date that we forward to the appropriate service using the
	 * service id in the message. as we don't know the class of message that
	 * the service handles, we forward it as a pure Message
	 */
	if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) {

		logging_debug( "baseoverlay received message of type OverlayMessageTypeData" );

		const ServiceID& service = overlayMsg->getService();
		CommunicationListener* serviceListener = communicationListeners.get( service );

		logging_debug( "received data for service " << service.toString() );

		if( serviceListener != NULL )
			serviceListener->onMessage( overlayMsg, overlayMsg->getSourceNode(), link );

		return true;

	} // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) )

	/* ************************************************************************
	/* Handle spovnet instance join requests
	 */
	else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) &&
						state == BaseOverlayStateInitiator){

		logging_debug(
			"baseoverlay received message of type OverlayMessageTypeJoinRequest"
		);

		JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
		logging_info( "received join request for spovnet " <<
					 joinReq->getSpoVNetID().toString() );

		/* make sure that the node actually wants to join
		 * the correct spovnet id that we administrate */
		if( joinReq->getSpoVNetID() != spovnetId ){
			logging_error( "received join request for spovnet we don't handle " <<
					joinReq->getSpoVNetID().toString() );
			return false;
		}

		//
		// only if all services allow the node to join it is allowed
		// using the isJoinAllowed interface security policies can be
		// implemented by higher layer services
		//

		// TODO: here you can implement mechanisms to deny joining of a node
		bool allow = true;

		logging_info( "sending back join reply for spovnet " <<
				spovnetId.toString() << " to node " <<
				overlayMsg->getSourceNode().toString() <<
				". result: " << (allow ? "allowed" : "denied") );

		joiningNodes.push_back( overlayMsg->getSourceNode() );

		//
		// send back our spovnetid, default overlay parameters, join allow
		// result, and ourself as the end-point to bootstrap the overlay against
		//

		assert( overlayInterface != NULL );
		OverlayParameterSet parameters = overlayInterface->getParameters();

		OverlayMsg retmsg( OverlayMsg::OverlayMessageTypeJoinReply, nodeId );
		JoinReply replyMsg( spovnetId, parameters,
					allow, getEndpointDescriptor() );

		retmsg.encapsulate(&replyMsg);
		bc->sendMessage( link, &retmsg );

		return true;

	} // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest)
	  //          && state == BaseOverlayStateInitiator)

	/* ************************************************************************
	 * handle replies to spovnet instance join requests
	 */
	else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) &&
						state == BaseOverlayStateJoinInitiated){

		logging_debug(
			"baseoverlay received message of type OverlayMessageTypeJoinReply");

		JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
		logging_info( "received spovnet join reply" );

		// ensure that we actually wanted to get into the spovnet whose id is
		// in the message
		if( replyMsg->getSpoVNetID() != spovnetId ){
			logging_error( "received spovnet join reply for spovnet " <<
					replyMsg->getSpoVNetID().toString() <<
					" but we wanted to join spovnet " <<
					spovnetId.toString() );

			// state does not change here, maybe the reply does come in later
			return false;
		}

		// if we did not get access to the spovnet notify of the failure and
		// close the link to the initiator
		if( ! replyMsg->getJoinAllowed() ){

			logging_error( "our join request has been denied" );

			bc->dropLink( initiatorLink );
			initiatorLink = LinkID::UNSPECIFIED;
			state = BaseOverlayStateInvalid;

			// inform all registered services of the event
			BOOST_FOREACH( NodeListener* i, nodeListeners ){
				i->onJoinFailed( spovnetId );
			}

			return true;
		}

		logging_info( "join request has been accepted for spovnet " <<
				spovnetId.toString() );

		// if we did get access to the spovnet we try to create the overlay
		// structure as given in the reply message
		overlayInterface = OverlayFactory::create( *this,
				replyMsg->getParam(), nodeId, this );

		if( overlayInterface == NULL ){
			logging_error( "overlay structure not supported" );

			bc->dropLink( initiatorLink );
			initiatorLink = LinkID::UNSPECIFIED;
			state = BaseOverlayStateInvalid;

			// inform all registered services of the event
			BOOST_FOREACH( NodeListener* i, nodeListeners )
				i->onJoinFailed( spovnetId );

			return true;
		}

		/* now start the join process for the overlay. the join process for the
		 * spovnet baseoverlay is now complete. we use the endpoint for overlay
		 * structure bootstrapping that the initiator provided in his reply
		 * message */
		state = BaseOverlayStateCompleted;
		ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);

		overlayInterface->createOverlay();
		overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );

		// inform all registered services of the event
		BOOST_FOREACH( NodeListener* i, nodeListeners ){
			i->onJoinCompleted( spovnetId );
		}

		return true;

	} // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) && state == BaseOverlayStateJoinInitiated)


	/* ************************************************************************
     * handle update messages for link establishment
     */
	else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ){

		logging_debug(
			"baseoverlay received message of type OverlayMessageTypeUpdate"
		);

		const NodeID& sourcenode = overlayMsg->getSourceNode();
		const ServiceID& service = overlayMsg->getService();

		// linkmapping for the link available? no-> ignore
		LinkMapping::iterator i = linkMapping.find( link );
		if( i == linkMapping.end() ) {
			logging_warn( "received overlay update message for link " <<
					link.toString() << " for which we have no mapping" );
			return false;
		}

		// update our link mapping information for this link
		bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service );
		i->second.node = sourcenode;
		i->second.service = service;

		// if our link information changed, we send out an update, too
		if( changed ){
			OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId );
			bc->sendMessage( link, &overMsg );
		}

		// set the correct listener service for the linkitem
		// now we can tell the registered service of the linkup event
		if( !communicationListeners.contains( service ) ){
			logging_warn( "linkup event for service that has not been registered" );
			return false;
		}

		CommunicationListener* iface = communicationListeners.get( service );
		if( iface == NULL || iface == &CommunicationListener::DEFAULT ){
			logging_warn( "linkup event for service that has been registered "
				"with a NULL interface" );
			return true;
		}

		i->second.interface = iface;
		i->second.markused();

		// ask the service whether it wants to accept this link
		if( !iface->onLinkRequest(sourcenode) ){

			logging_debug("link " << link.toString() <<
								" has been denied by service " << service.toString() << ", dropping link");

			// prevent onLinkDown calls to the service
			i->second.interface = &CommunicationListener::DEFAULT;
			// drop the link
			dropLink( link );

			return true;
		}

		//
		// link has been accepted, link is now up, send messages out first
		//

		i->second.linkup = true;
		logging_debug("link " << link.toString() <<
						" has been accepted by service " << service.toString() << " and is now up");

		if( i->second.waitingmsg.size() > 0 ){
			logging_info( "sending out queued messages on link " << link.toString() );

			BOOST_FOREACH( Message* msg, i->second.waitingmsg ){
				sendMessage( msg, link );
				delete msg;
			}

			i->second.waitingmsg.clear();
		}

		// call the notification functions
		iface->onLinkUp( link, sourcenode );
		sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId );

		return true;

	} // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) )

	/* ************************************************************************
	 * handle bye messages
	 */
	else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye) ) {

		logging_debug( "BaseOverlay received message of type OverlayMessageTypeBye" );
		logging_debug( "Received bye message from " <<
				overlayMsg->getSourceNode().toString() );

		/* if we are the initiator and receive a bye from a node
		 * the node just left. if we are a node and receive a bye
		 * from the initiator, we have to close, too.
		 */
		if( overlayMsg->getSourceNode() == spovnetInitiator ){

			bc->dropLink( initiatorLink );
			initiatorLink = LinkID::UNSPECIFIED;
			state = BaseOverlayStateInvalid;

			logging_fatal( "initiator ended spovnet" );

			// inform all registered services of the event
			BOOST_FOREACH( NodeListener* i, nodeListeners ){
				i->onLeaveFailed( spovnetId );
			}

		} else {
			// a node that said goodbye and we are the initiator don't have to
			// do much here, as the node also will go out of the overlay
			// structure
			logging_info( "node left " << overlayMsg->getSourceNode() );
		}

		return true;

	} // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye))

	/* ************************************************************************
	 * handle link request forwarded through the overlay
	 */
	else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest)) {
		LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
		const ServiceID& service = overlayMsg->getService();
		if (linkReq->isReply()) {

			// find link
			PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
			if ( i == pendingLinks.end() ) {
				logging_error( "Nonce not found in link request" );
				return true;
			}

			// debug message
			logging_debug( "LinkRequest reply received. Establishing link "
				<< i->second << " to " << (linkReq->getEndpoint()->toString())
				<< " for service " << service.toString()
				<< " with nonce " << linkReq->getNonce()
			);

			// establishing link
			bc->establishLink( *linkReq->getEndpoint(), i->second );
		} else {
			OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId );
			LinkRequest link_request_msg(
					linkReq->getNonce(), &bc->getEndpointDescriptor(), true );
			overlay_msg.encapsulate( &link_request_msg );

			// debug message
			logging_debug( "Sending LinkRequest reply for link with nonce " <<
					linkReq->getNonce()	);

			// route message back over overlay
			overlayInterface->routeMessage(
				overlayMsg->getSourceNode(), &overlay_msg
			);
		}
	} // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest))

	/* ************************************************************************
	 * unknown message type ... error!
	 */
	else {

		logging_error( "received message in invalid state! don't know " <<
				"what to do with this message of type " <<
				overlayMsg->getType() );
		return false;

	} // else

	return false;
}

void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service){

	logging_debug( "broadcasting message to all known nodes " <<
			"in the overlay from service " + service.toString() );

	OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes();

	OverlayInterface::NodeList::iterator i = nodes.begin();
	OverlayInterface::NodeList::iterator iend = nodes.end();

	for( ; i != iend; i++ ){
		if( *i == nodeId) continue; // don't send to ourselfs
		sendMessage( message, *i, service );
	}
}

vector<NodeID> BaseOverlay::getOverlayNeighbors() const {
	// the known nodes _can_ also include our
	// node, so we remove ourselfs

	vector<NodeID> nodes = overlayInterface->getKnownNodes();
	vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
	if( i != nodes.end() ) nodes.erase( i );

	return nodes;
}

void BaseOverlay::updateOvlVis( const NodeID& n ) {
	NodeID node = n;
/*	void visShowNodeBubble (
        NETWORK_ID network,
        NodeID& node,
        string label
        );
*/
	using namespace std;

	if (node == nodeId || node.isUnspecified()) return;

	// min/max
	if ( node < min || min.isUnspecified() ) min = node;
	if ( node > max || max.isUnspecified() ) max = node;

	// successor
	if ( succ.isUnspecified() || (node > nodeId && (succ < nodeId || (node-nodeId) < (succ-nodeId))) ) {
		if (!succ.isUnspecified() && node != succ)
			ovl.visDisconnect(ovlId, nodeId, succ, string(""));
		succ = node;
		ovl.visConnect(ovlId, nodeId, succ, string(""));
	}

	// set successor (circle-wrap)
	if (succ.isUnspecified() && !min.isUnspecified()) {
		succ = min;
		ovl.visConnect(ovlId, nodeId, succ, string(""));
	}
}

const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {

	if( lid == LinkID::UNSPECIFIED ) return nodeId;

	LinkMapping::const_iterator i = linkMapping.find( lid );
	if( i == linkMapping.end() ) return NodeID::UNSPECIFIED;
	else return i->second.node;
}

vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {

	vector<LinkID> linkvector;

	BOOST_FOREACH( LinkPair item, linkMapping ){
		if( item.second.node == nid || nid == NodeID::UNSPECIFIED ){
			linkvector.push_back( item.second.link );
		}
	}

	return linkvector;
}

void BaseOverlay::incomingRouteMessage(Message* msg){
	// gets handled as normal data message
	receiveMessage( msg, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED );
}

void BaseOverlay::onNodeJoin(const NodeID& node){

	JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
	if( i == joiningNodes.end() ) return;

	logging_info( "node has successfully joined baseoverlay and overlay structure "
				<< node.toString() );

	joiningNodes.erase( i );
}

void BaseOverlay::eventFunction(){

	list<LinkID> oldlinks;
	time_t now = time(NULL);

	// first gather all the links from linkMapping that need droppin
	// don't directly drop, as the dropLink function affects the
	// linkMapping structure that we are traversing here.
	// drop links after a timeout of 30s

	BOOST_FOREACH( LinkPair item, linkMapping ){
		if( item.second.autolink && difftime(now, item.second.lastuse) > 30)
			oldlinks.push_back( item.first );
	}

	BOOST_FOREACH( const LinkID lnk, oldlinks ) {
		logging_debug( "auto-link " << lnk.toString() << " timed out and is getting dropped" );
		dropLink( lnk );
	}
}

}} // namespace ariba, overlay
