// [License]
// The Ariba-Underlay Copyright
//
// Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH)
//
// Institute of Telematics
// Universität Karlsruhe (TH)
// Zirkel 2, 76128 Karlsruhe
// Germany
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// The views and conclusions contained in the software and documentation
// are those of the authors and should not be interpreted as representing
// official policies, either expressed or implied, of the Institute of
// Telematics.
// [License]

#include "BaseOverlay.h"

#include <sstream>
#include <iostream>
#include <string>
#include <boost/foreach.hpp>

#include "ariba/NodeListener.h"
#include "ariba/CommunicationListener.h"
#include "ariba/SideportListener.h"

#include "ariba/overlay/LinkDescriptor.h"

#include "ariba/overlay/messages/OverlayMsg.h"
#include "ariba/overlay/messages/JoinRequest.h"
#include "ariba/overlay/messages/JoinReply.h"

#include "ariba/utility/misc/OvlVis.h"

namespace ariba {
namespace overlay {

/* *****************************************************************************
 * PREREQUESITES
 * ****************************************************************************/

CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
	if( !communicationListeners.contains( service ) ) {
		logging_error( "No listener found for service " << service.toString() );
		return NULL;
	}
	CommunicationListener* listener = communicationListeners.get( service );
	assert( listener != NULL );
	return listener;
}

// link descriptor handling ----------------------------------------------------

LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
	BOOST_FOREACH( LinkDescriptor* lp, links )
		if ((communication ? lp->communicationId : lp->overlayId) == link)
			return lp;
	return NULL;
}

const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
	BOOST_FOREACH( const LinkDescriptor* lp, links )
		if ((communication ? lp->communicationId : lp->overlayId) == link)
			return lp;
	return NULL;
}

/// erases a link descriptor
void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
	for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
		LinkDescriptor* ld = *i;
		if ((communication ? ld->communicationId : ld->overlayId) == link) {
			delete ld;
			links.erase(i);
			break;
		}
	}
}

/// adds a link descriptor
LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
	LinkDescriptor* desc = getDescriptor( link );
	if ( desc == NULL ) {
		desc = new LinkDescriptor();
		if (!link.isUnspecified()) desc->overlayId = link;
		links.push_back(desc);
	}
	return desc;
}

/// returns a auto-link descriptor
LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
	// search for a descriptor that is already up
	BOOST_FOREACH( LinkDescriptor* lp, links )
		if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
			return lp;
	// if not found, search for one that is about to come up...
	BOOST_FOREACH( LinkDescriptor* lp, links )
		if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
			return lp;
	return NULL;
}

/// stabilizes link information
void BaseOverlay::stabilizeLinks() {
	// send keep-alive messages over established links
	BOOST_FOREACH( LinkDescriptor* ld, links ) {
		if (!ld->up) continue;
		OverlayMsg msg( OverlayMsg::typeLinkAlive,
			OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
		msg.setRelayed(true);
		if (ld->relayed) msg.setRouteRecord(true);
		send_link( &msg, ld->overlayId );
	}

	// iterate over all links and check for time boundaries
	vector<LinkDescriptor*> oldlinks;
	time_t now = time(NULL);
	BOOST_FOREACH( LinkDescriptor* ld, links ) {

		// keep alives and not up? yes-> link connection request is stale!
		if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {

			// increase counter
			ld->keepAliveMissed++;

			// missed more than four keep-alive messages (10 sec)? -> drop link
			if (ld->keepAliveMissed > 4) {
				logging_info( "Link connection request is stale, closing: " << ld );
				oldlinks.push_back( ld );
				continue;
			}
		}

		if (!ld->up) continue;

		// remote used as relay flag
		if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
			ld->relaying = false;

		// drop links that are dropped and not used as relay
		if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
			oldlinks.push_back( ld );
			continue;
		}

		// auto-link time exceeded?
		if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
			oldlinks.push_back( ld );
			continue;
		}

		// keep alives missed? yes->
		if ( difftime( now, ld->keepAliveTime ) > 2 ) {

			// increase counter
			ld->keepAliveMissed++;

			// missed more than four keep-alive messages (4 sec)? -> drop link
			if (ld->keepAliveMissed >= 4) {
				logging_info( "Link is stale, closing: " << ld );
				oldlinks.push_back( ld );
				continue;
			}
		}
	}

	// drop links
	BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
		logging_info( "Link timed out. Dropping " << ld );
		ld->relaying = false;
		dropLink( ld->overlayId );
	}

	// show link state
	counter++;
	if (counter>=4) showLinks();
	if (counter>=4 || counter<0) counter = 0;
}

/// shows the current link state
void BaseOverlay::showLinks() {
	int i=0;
	logging_info("--- link state -------------------------------");
	BOOST_FOREACH( LinkDescriptor* ld, links ) {
		logging_info("link " << i << ": " << ld);
		i++;
	}
	logging_info("----------------------------------------------");
}

/// compares two arbitrary links to the same node
int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
	LinkDescriptor* lhsld = getDescriptor(lhs);
	LinkDescriptor* rhsld = getDescriptor(rhs);
	if (lhsld==NULL || rhsld==NULL
		|| !lhsld->up || !rhsld->up
		|| lhsld->remoteNode != rhsld->remoteNode) return -1;

	if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId)  )
		return -1;

	return 1;
}


// internal message delivery ---------------------------------------------------

/// routes a message to its destination node
void BaseOverlay::route( OverlayMsg* message ) {

	// exceeded time-to-live? yes-> drop message
	if (message->getNumHops() > message->getTimeToLive()) {
		logging_warn("Message exceeded TTL -> drop message!");
		return;

	// no-> forward message
	} else {
		// destinastion myself? yes-> handle message
		if (message->getDestinationNode() == nodeId) {
			logging_warn("Usually I should not route messages to myself!");
			Message msg;
			msg.encapsulate(message);
			handleMessage( &msg, NULL );
		} else {
			// no->send message to next hop
			send( message, message->getDestinationNode() );
		}
	}
}

/// sends a message to another node, delivers it to the base overlay class
seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
	LinkDescriptor* next_link = NULL;

	// drop messages to unspecified destinations
	if (destination.isUnspecified()) return -1;

	// send messages to myself -> handle message and drop warning!
	if (destination == nodeId) {
		logging_warn("Sent message to myself. Handling message.")
		Message msg;
		msg.encapsulate(message);
		handleMessage( &msg, NULL );
		return -1;
	}

	// relay path known? yes-> send over relay link
	next_link = getRelayLinkTo( destination );
	if (next_link != NULL) {
		next_link->setRelaying();
		return send(message, next_link);
	}

	// no-> relay path! route over overlay path
	LinkID next_id = overlayInterface->getNextLinkId( destination );
	if (next_id.isUnspecified()) {
		logging_error("Could not send message. No next hop found to " << destination );
		return -1;
	}

	// get link descriptor, up and running? yes-> send message
	next_link = getDescriptor(next_id);
	if (next_link != NULL && next_link->up)
		return send(message, next_link);

	// no-> error, dropping message
	else {
		logging_error("Could not send message. Link not known or up");
		return -1;
	}

	// not reached-> fail
	return -1;
}

/// send a message using a link descriptor, delivers it to the base overlay class
seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
	// check if null
	if (ldr == NULL) {
		logging_error("Can not send message to " << message->getDestinationAddress());
		return -1;
	}

	// check if up
	if (!ldr->up && !ignore_down) {
		logging_error("Can not send message. Link not up:" << ldr );
		return -1;
	}
	LinkDescriptor* ld = NULL;

	// handle relayed link
	if (ldr->relayed) {
		logging_debug("send(): Resolving direct link for relayed link to "
			<< ldr->remoteNode);
		ld = getRelayLinkTo( ldr->remoteNode );
		if (ld==NULL) {
			LinkID lnk = overlayInterface->getNextLinkId(ldr->remoteNode);
			if (!lnk.isUnspecified())
				ld = getDescriptor(lnk);
			if (ld!=NULL && ld->relayed)
				ld = NULL;
		}
		if (ld==NULL) {
			logging_error("Direct link not found.");
			return -1;
		}
		message->setRelayed();
		ld->setRelaying();
	} else
		ld = ldr;

	// handle direct link
	if (ld->communicationUp) {
		logging_debug("send(): Sending message over direct link.");
		return bc->sendMessage( ld->communicationId, message );
	} else {
		logging_error("send(): Could not send mesage. "
				"Not a relayed link and direct link is not up.");
		return -1;
	}
	return -1;
}

seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
	const ServiceID& service) {
	message->setSourceNode(nodeId);
	message->setDestinationNode(remote);
	message->setService(service);
	send( message, remote );
}

seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
	LinkDescriptor* ld = getDescriptor(link);
	if (ld==NULL) {
		logging_error("Cannot find descriptor to link id=" << link.toString());
		return -1;
	}
	message->setSourceNode(nodeId);
	message->setDestinationNode(ld->remoteNode);

	message->setSourceLink(ld->overlayId);
	message->setDestinationLink(ld->remoteLink);

	message->setService(ld->service);
	return send( message, ld, ignore_down );
}

// relay route management ------------------------------------------------------

/// stabilize relay information
void BaseOverlay::stabilizeRelays() {
	vector<relay_route>::iterator i = relay_routes.begin();
	while (i!=relay_routes.end() ) {
		relay_route& route = *i;
		LinkDescriptor* ld = getDescriptor(route.link);
		if (ld==NULL
			|| !ld->up
			|| ld->keepAliveMissed != 0
			|| !ld->communicationUp
			|| difftime(route.used, time(NULL)) > 4) {
			logging_info("Forgetting relay information to node "
					<< route.node.toString() );
			i = relay_routes.erase(i);
		} else
			i++;
	}
}

void BaseOverlay::removeRelayLink( const LinkID& link ) {
	vector<relay_route>::iterator i = relay_routes.begin();
	while (i!=relay_routes.end() ) {
		relay_route& route = *i;
		if (route.link == link ) i = relay_routes.erase(i); else i++;
	}
}

void BaseOverlay::removeRelayNode( const NodeID& remote ) {
	vector<relay_route>::iterator i = relay_routes.begin();
	while (i!=relay_routes.end() ) {
		relay_route& route = *i;
		if (route.node == remote ) i = relay_routes.erase(i); else i++;
	}
}

/// refreshes relay information
void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {

	// handle relayed messages from real links only
	if (ld == NULL
		|| ld->relayed
		|| !message->isRelayed()
//		|| !(message->getService() == OverlayInterface::OVERLAY_SERVICE_ID)
		|| message->getSourceNode()==nodeId ) return;

	// check wheter this node is already part of the routing table
	LinkID next_link = overlayInterface->getNextLinkId(message->getSourceNode());
	if (next_link == ld->overlayId) return;
	ld->setRelaying();

	// try to find source node
	BOOST_FOREACH( relay_route& route, relay_routes ) {

		// relay route found? yes->
		if ( route.node == message->getSourceNode() ) {

			// refresh timer
			route.used = time(NULL);

			// route has a shorter hop count? yes-> replace
			if (route.hops > message->getNumHops() ) {
				logging_info("Updating relay information to node "
					<< route.node.toString()
					<< " reducing to " << message->getNumHops() << " hops.");
				route.hops = message->getNumHops();
				route.link = ld->overlayId;
			}
			return;
		}
	}

	// not found-> add new entry
	relay_route route;
	route.hops = message->getNumHops();
	route.link = ld->overlayId;
	route.node = message->getSourceNode();
	route.used = time(NULL);
	logging_info("Remembering relay information to node " << route.node.toString());
	relay_routes.push_back(route);
}

/// returns a known "vital" relay link which is up and running
LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
	// try to find source node
	BOOST_FOREACH( relay_route& route, relay_routes ) {
		if (route.node == remote ) {
			LinkDescriptor* ld = getDescriptor( route.link );
			if (ld==NULL || !ld->up) return NULL; else {
				route.used = time(NULL);
				return ld;
			}
		}
	}
	return NULL;
}

/* *****************************************************************************
 * PUBLIC MEMBERS
 * ****************************************************************************/

use_logging_cpp(BaseOverlay);

// ----------------------------------------------------------------------------

BaseOverlay::BaseOverlay() :
	bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
	spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
	sideport(&SideportListener::DEFAULT), started(false), counter(0) {
}

BaseOverlay::~BaseOverlay() {
}

// ----------------------------------------------------------------------------

void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
	logging_info("Starting...");

	// set parameters
	bc = &_basecomm;
	nodeId = _nodeid;

	// register at base communication
	bc->registerMessageReceiver( this );
	bc->registerEventListener( this );

	// timer for auto link management
	Timer::setInterval( 1000 );
	Timer::start();

	started = true;
	state = BaseOverlayStateInvalid;
}

void BaseOverlay::stop() {
	logging_info("Stopping...");

	// stop timer
	Timer::stop();

	// delete oberlay interface
	if(overlayInterface != NULL) {
		delete overlayInterface;
		overlayInterface = NULL;
	}

	// unregister at base communication
	bc->unregisterMessageReceiver( this );
	bc->unregisterEventListener( this );

	started = false;
	state = BaseOverlayStateInvalid;
}

bool BaseOverlay::isStarted(){
	return started;
}

// ----------------------------------------------------------------------------

void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
		const EndpointDescriptor& bootstrapEp) {

	if(id != spovnetId){
		logging_error("attempt to join against invalid spovnet, call initiate first");
		return;
	}


	//ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
	logging_info( "Starting to join spovnet " << id.toString() <<
			" with nodeid " << nodeId.toString());

	if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){

		// bootstrap against ourselfs
		logging_debug("joining spovnet locally");

		overlayInterface->joinOverlay();
		state = BaseOverlayStateCompleted;
		BOOST_FOREACH( NodeListener* i, nodeListeners )
			i->onJoinCompleted( spovnetId );

		//ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
		//ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );

		logging_debug("starting overlay bootstrap module");
		overlayBootstrap.start(this, spovnetId, nodeId);
		overlayBootstrap.publish(bc->getEndpointDescriptor());

	} else {

		// bootstrap against another node
		logging_debug("joining spovnet remotely against " << bootstrapEp.toString());

		const LinkID& lnk = bc->establishLink( bootstrapEp );
		bootstrapLinks.push_back(lnk);
		logging_info("join process initiated for " << id.toString() << "...");
	}
}

void BaseOverlay::leaveSpoVNet() {

	logging_info( "Leaving spovnet " << spovnetId );
	bool ret = ( state != this->BaseOverlayStateInvalid );

	logging_debug("stopping overlay bootstrap module");
	overlayBootstrap.stop();
	overlayBootstrap.revoke();

	logging_debug( "Dropping all auto-links" );

	// gather all service links
	vector<LinkID> servicelinks;
	BOOST_FOREACH( LinkDescriptor* ld, links ) {
		if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
		servicelinks.push_back( ld->overlayId );
	}

	// drop all service links
	BOOST_FOREACH( LinkID lnk, servicelinks )
		dropLink( lnk );

	// let the node leave the spovnet overlay interface
	logging_debug( "Leaving overlay" );
	if( overlayInterface != NULL )
		overlayInterface->leaveOverlay();

	// drop still open bootstrap links
	BOOST_FOREACH( LinkID lnk, bootstrapLinks )
		bc->dropLink( lnk );

	// change to inalid state
	state = BaseOverlayStateInvalid;
	//ovl.visShutdown( ovlId, nodeId, string("") );

	// inform all registered services of the event
	BOOST_FOREACH( NodeListener* i, nodeListeners ) {
		if( ret ) i->onLeaveCompleted( spovnetId );
		else i->onLeaveFailed( spovnetId );
	}
}

void BaseOverlay::createSpoVNet(const SpoVNetID& id,
		const OverlayParameterSet& param,
		const SecurityParameterSet& sec,
		const QoSParameterSet& qos) {

	// set the state that we are an initiator, this way incoming messages are
	// handled correctly
	logging_info( "creating spovnet " + id.toString() <<
			" with nodeid " << nodeId.toString() );

	spovnetId = id;

	overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
	if( overlayInterface == NULL ) {
		logging_fatal( "overlay structure not supported" );
		state = BaseOverlayStateInvalid;

		BOOST_FOREACH( NodeListener* i, nodeListeners )
			i->onJoinFailed( spovnetId );

		return;
	}
}

// ----------------------------------------------------------------------------

const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
	const NodeID& remoteId, const ServiceID& service ) {

	// establish link via overlay
	if (!remoteId.isUnspecified())
		return establishLink( remoteId, service );
	else

	// establish link directly if only ep is known
	if (remoteId.isUnspecified())
		return establishDirectLink(remoteEp, service );

}

/// call base communication's establish link and add link mapping
const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
	const ServiceID& service ) {

	/// find a service listener
	if( !communicationListeners.contains( service ) ) {
		logging_error( "No listener registered for service id=" << service.toString() );
		return LinkID::UNSPECIFIED;
	}
	CommunicationListener* listener = communicationListeners.get( service );
	assert( listener != NULL );

	// create descriptor
	LinkDescriptor* ld = addDescriptor();
	ld->relayed = false;
	ld->listener = listener;
	ld->service = service;
	ld->communicationId = bc->establishLink( ep );

	/// establish link and add mapping
	logging_info("Establishing direct link " << ld->communicationId.toString()
		<< " using " << ep.toString());

	return ld->communicationId;
}

/// establishes a link between two arbitrary nodes
const LinkID BaseOverlay::establishLink( const NodeID& remote,
	const ServiceID& service ) {

	// do not establish a link to myself!
	if (remote == nodeId) return LinkID::UNSPECIFIED;

	// create a link descriptor
	LinkDescriptor* ld = addDescriptor();
	ld->relayed = true;
	ld->remoteNode = remote;
	ld->service = service;
	ld->listener = getListener(ld->service);

	// create link request message
	OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
	msg.setSourceLink(ld->overlayId);
	msg.setRelayed(true);

	// debug message
	logging_info(
		"Sending link request with"
		<< " link=" << ld->overlayId.toString()
		<< " node=" << ld->remoteNode.toString()
		<< " serv=" << ld->service.toString()
	);

	// sending message to node
	send_node( &msg, ld->remoteNode, ld->service );

	return ld->overlayId;
}

/// drops an established link
void BaseOverlay::dropLink(const LinkID& link) {
	logging_info( "Dropping link (initiated locally):" << link.toString() );

	// find the link item to drop
	LinkDescriptor* ld = getDescriptor(link);
	if( ld == NULL ) {
		logging_warn( "Can't drop link, link is unknown!");
		return;
	}

	// delete all queued messages
	if( ld->messageQueue.size() > 0 ) {
		logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
				<< ld->messageQueue.size() << " waiting messages" );
		ld->flushQueue();
	}

	// inform sideport and listener
	ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
	sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );

	// do not drop relay links
	if (!ld->relaying) {
		// drop the link in base communication
		if (ld->communicationUp) bc->dropLink( ld->communicationId );

		// erase descriptor
		eraseDescriptor( ld->overlayId );
	} else {
		ld->dropAfterRelaying = true;
	}
}

// ----------------------------------------------------------------------------

/// internal send message, always use this functions to send messages over links
seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
	logging_debug( "Sending data message on link " << link.toString() );

	// get the mapping for this link
	LinkDescriptor* ld = getDescriptor(link);
	if( ld == NULL ) {
		logging_error("Could not send message. "
			<< "Link not found id=" << link.toString());
		return -1;
	}

	// check if the link is up yet, if its an auto link queue message
	if( !ld->up ) {
		ld->setAutoUsed();
		if( ld->autolink ) {
			logging_info("Auto-link " << link.toString() << " not up, queue message");
			Data data = data_serialize( message );
			const_cast<Message*>(message)->dropPayload();
			ld->messageQueue.push_back( new Message(data) );
		} else {
			logging_error("Link " << link.toString() << " not up, drop message");
		}
		return -1;
	}

	// compile overlay message (has service and node id)
	OverlayMsg overmsg( OverlayMsg::typeData );
	overmsg.encapsulate( const_cast<Message*>(message) );

	// send message over relay/direct/overlay
	return send_link( &overmsg, ld->overlayId );
}

seqnum_t BaseOverlay::sendMessage(const Message* message,
	const NodeID& node, const ServiceID& service) {

	// find link for node and service
	LinkDescriptor* ld = getAutoDescriptor( node, service );

	// if we found no link, create an auto link
	if( ld == NULL ) {

		// debug output
		logging_info( "No link to send message to node "
			<< node.toString() << " found for service "
			<< service.toString() << ". Creating auto link ..."
		);

		// call base overlay to create a link
		LinkID link = establishLink( node, service );
		ld = getDescriptor( link );
		if( ld == NULL ) {
			logging_error( "Failed to establish auto-link.");
			return -1;
		}
		ld->autolink = true;

		logging_debug( "Auto-link establishment in progress to node "
				<< node.toString() << " with link id=" << link.toString() );
	}
	assert(ld != NULL);

	// mark the link as used, as we now send a message through it
	ld->setAutoUsed();

	// send / queue message
	return sendMessage( message, ld->overlayId );
}

// ----------------------------------------------------------------------------

const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
		const LinkID& link) const {

	// return own end-point descriptor
	if( link == LinkID::UNSPECIFIED )
		return bc->getEndpointDescriptor();

	// find link descriptor. not found -> return unspecified
	const LinkDescriptor* ld = getDescriptor(link);
	if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();

	// return endpoint-descriptor from base communication
	return bc->getEndpointDescriptor( ld->communicationId );
}

const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
		const NodeID& node) const {

	// return own end-point descriptor
	if( node == nodeId || node == NodeID::UNSPECIFIED )
		return bc->getEndpointDescriptor();

	// no joined and request remote descriptor? -> fail!
	if( overlayInterface == NULL ) {
		logging_error( "overlay interface not set, cannot resolve endpoint" );
		return EndpointDescriptor::UNSPECIFIED();
	}

	// resolve end-point descriptor from the base-overlay routing table
	return overlayInterface->resolveNode( node );
}

// ----------------------------------------------------------------------------

bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
	sideport = _sideport;
	_sideport->configure( this );
}

bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
	sideport = &SideportListener::DEFAULT;
}

// ----------------------------------------------------------------------------

bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
	logging_debug( "binding communication listener " << listener
			<< " on serviceid " << sid.toString() );

	if( communicationListeners.contains( sid ) ) {
		logging_error( "some listener already registered for service id "
				<< sid.toString() );
		return false;
	}

	communicationListeners.registerItem( listener, sid );
	return true;
}


bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
	logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );

	if( !communicationListeners.contains( sid ) ) {
		logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
		return false;
	}

	if( communicationListeners.get(sid) != listener ) {
		logging_warn( "listener bound to service id " << sid.toString()
				<< " is different than listener trying to unbind" );
		return false;
	}

	communicationListeners.unregisterItem( sid );
	return true;
}

// ----------------------------------------------------------------------------

bool BaseOverlay::bind(NodeListener* listener) {
	logging_debug( "Binding node listener " << listener );

	// already bound? yes-> warning
	NodeListenerVector::iterator i =
		find( nodeListeners.begin(), nodeListeners.end(), listener );
	if( i != nodeListeners.end() ) {
		logging_warn("Node listener " << listener << " is already bound!" );
		return false;
	}

	// no-> add
	nodeListeners.push_back( listener );
	return true;
}

bool BaseOverlay::unbind(NodeListener* listener) {
	logging_debug( "Unbinding node listener " << listener );

	// already unbound? yes-> warning
	NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
	if( i == nodeListeners.end() ) {
		logging_warn( "Node listener " << listener << " is not bound!" );
		return false;
	}

	// no-> remove
	nodeListeners.erase( i );
	return true;
}

// ----------------------------------------------------------------------------

void BaseOverlay::onLinkUp(const LinkID& id,
	const address_v* local, const address_v* remote) {
	logging_debug( "Link up with base communication link id=" << id );

	// get descriptor for link
	LinkDescriptor* ld = getDescriptor(id, true);

	// handle bootstrap link we initiated
	if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
		logging_info(
			"Join has been initiated by me and the link is now up. " <<
			"Sending out join request for SpoVNet " << spovnetId.toString()
		);

		// send join request message
		OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
			OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
		JoinRequest joinRequest( spovnetId, nodeId );
		overlayMsg.encapsulate( &joinRequest );
		bc->sendMessage( id, &overlayMsg );
		return;
	}

	// no link found? -> link establishment from remote, add one!
	if (ld == NULL) {
		ld = addDescriptor( id );
		logging_info( "onLinkUp (remote request) descriptor: " << ld );

		// update descriptor
		ld->fromRemote = true;
		ld->communicationId = id;
		ld->communicationUp = true;
		ld->setAutoUsed();
		ld->setAlive();

		// in this case, do not inform listener, since service it unknown
		// -> wait for update message!

	// link mapping found? -> send update message with node-id and service id
	} else {
		logging_info( "onLinkUp descriptor (initiated locally):" << ld );

		// update descriptor
		ld->setAutoUsed();
		ld->setAlive();
		ld->communicationUp = true;
		ld->fromRemote = false;

		// if link is a relayed link->convert to direct link
		if (ld->relayed) {
			logging_info( "Converting to direct link: " << ld );
			ld->up = true;
			ld->relayed = false;
			OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
			overMsg.setSourceLink( ld->overlayId );
			overMsg.setDestinationLink( ld->remoteLink );
			send_link( &overMsg, ld->overlayId );
		} else {
			// note: necessary to validate the link on the remote side!
			logging_info( "Sending out update" <<
				" for service " << ld->service.toString() <<
				" with local node id " << nodeId.toString() <<
				" on link " << ld->overlayId.toString() );

			// compile and send update message
			OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
			overlayMsg.setSourceLink(ld->overlayId);
			overlayMsg.setAutoLink( ld->autolink );
			send_link( &overlayMsg, ld->overlayId, true );
		}
	}
}

void BaseOverlay::onLinkDown(const LinkID& id,
	const address_v* local, const address_v* remote) {

	// erase bootstrap links
	vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
	if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );

	// get descriptor for link
	LinkDescriptor* ld = getDescriptor(id, true);
	if ( ld == NULL ) return; // not found? ->ignore!
	logging_info( "onLinkDown descriptor: " << ld );

	// removing relay link information
	removeRelayLink(ld->overlayId);

	// inform listeners about link down
	ld->communicationUp = false;
	if (!ld->service.isUnspecified()) {
		getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode );
		sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
	}

	// delete all queued messages (auto links)
	if( ld->messageQueue.size() > 0 ) {
		logging_warn( "Dropping link " << id.toString() << " that has "
				<< ld->messageQueue.size() << " waiting messages" );
		ld->flushQueue();
	}

	// erase mapping
	eraseDescriptor(ld->overlayId);
}

void BaseOverlay::onLinkChanged(const LinkID& id,
	const address_v* oldlocal, const address_v* newlocal,
	const address_v* oldremote, const address_v* newremote) {

	// get descriptor for link
	LinkDescriptor* ld = getDescriptor(id, true);
	if ( ld == NULL ) return; // not found? ->ignore!
	logging_debug( "onLinkChanged descriptor: " << ld );

	// inform listeners
	ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
	sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );

	// autolinks: refresh timestamp
	ld->setAutoUsed();
}

void BaseOverlay::onLinkFail(const LinkID& id,
	const address_v* local, const address_v* remote) {
	logging_debug( "Link fail with base communication link id=" << id );

	// erase bootstrap links
	vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
	if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );

	// get descriptor for link
	LinkDescriptor* ld = getDescriptor(id, true);
	if ( ld == NULL ) return; // not found? ->ignore!
	logging_debug( "Link failed id=" << ld->overlayId.toString() );

	// inform listeners
	ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
	sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
}

void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
	const address_v* remote, const QoSParameterSet& qos) {
	logging_debug( "Link quality changed with base communication link id=" << id );

	// get descriptor for link
	LinkDescriptor* ld = getDescriptor(id, true);
	if ( ld == NULL ) return; // not found? ->ignore!
	logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
}

bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
	const address_v* remote ) {
	logging_debug("Accepting link request from " << remote->to_string() );
	return true;
}

/// handles a message from base communication
bool BaseOverlay::receiveMessage(const Message* message,
		const LinkID& link, const NodeID& ) {
	// get descriptor for link
	LinkDescriptor* ld = getDescriptor( link, true );
	return handleMessage( message, ld, link );
}

// ----------------------------------------------------------------------------

/// Handle spovnet instance join requests
bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {

	// decapsulate message
	JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
	logging_info( "Received join request for spovnet " <<
			joinReq->getSpoVNetID().toString() );

	// check spovnet id
	if( joinReq->getSpoVNetID() != spovnetId ) {
		logging_error(
			"Received join request for spovnet we don't handle " <<
			joinReq->getSpoVNetID().toString() );
		return false;
	}

	// TODO: here you can implement mechanisms to deny joining of a node
	bool allow = true;
	logging_info( "Sending join reply for spovnet " <<
			spovnetId.toString() << " to node " <<
			overlayMsg->getSourceNode().toString() <<
			". Result: " << (allow ? "allowed" : "denied") );
	joiningNodes.push_back( overlayMsg->getSourceNode() );

	// return overlay parameters
	assert( overlayInterface != NULL );
	logging_debug( "Using bootstrap end-point "
		<< getEndpointDescriptor().toString() )
	OverlayParameterSet parameters = overlayInterface->getParameters();
	OverlayMsg retmsg( OverlayMsg::typeJoinReply,
		OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
	JoinReply replyMsg( spovnetId, parameters,
			allow, getEndpointDescriptor() );
	retmsg.encapsulate(&replyMsg);
	bc->sendMessage( bcLink, &retmsg );

	return true;
}

/// Handle replies to spovnet instance join requests
bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
	// decapsulate message
	logging_debug("received join reply message");
	JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();

	// correct spovnet?
	if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
		logging_error( "Received SpoVNet join reply for " <<
				replyMsg->getSpoVNetID().toString() <<
				" != " << spovnetId.toString() );
		delete replyMsg;
		return false;
	}

	// access granted? no -> fail
	if( !replyMsg->getJoinAllowed() ) {
		logging_error( "Our join request has been denied" );

		// drop initiator link
		if( !bcLink.isUnspecified() ){
			bc->dropLink( bcLink );

			vector<LinkID>::iterator it = std::find(
					bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
			if( it != bootstrapLinks.end() )
				bootstrapLinks.erase(it);
		}

		// inform all registered services of the event
		BOOST_FOREACH( NodeListener* i, nodeListeners )
			i->onJoinFailed( spovnetId );

		delete replyMsg;
		return true;
	}

	// access has been granted -> continue!
	logging_info("Join request has been accepted for spovnet " <<
			spovnetId.toString() );

	logging_debug( "Using bootstrap end-point "
		<< replyMsg->getBootstrapEndpoint().toString() );

	// create overlay structure from spovnet parameter set
	// if we have not boostrapped yet against some other node
	if( overlayInterface == NULL ){

		logging_debug("first-time bootstrapping");

		overlayInterface = OverlayFactory::create(
			*this, replyMsg->getParam(), nodeId, this );

		// overlay structure supported? no-> fail!
		if( overlayInterface == NULL ) {
			logging_error( "overlay structure not supported" );

			if( !bcLink.isUnspecified() ){
				bc->dropLink( bcLink );

				vector<LinkID>::iterator it = std::find(
						bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
				if( it != bootstrapLinks.end() )
					bootstrapLinks.erase(it);
			}

			// inform all registered services of the event
			BOOST_FOREACH( NodeListener* i, nodeListeners )
			i->onJoinFailed( spovnetId );

			delete replyMsg;
			return true;
		}

		// everything ok-> join the overlay!
		state = BaseOverlayStateCompleted;
		overlayInterface->createOverlay();

		overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
		overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );

		// update ovlvis
		//ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);

		// inform all registered services of the event
		BOOST_FOREACH( NodeListener* i, nodeListeners )
			i->onJoinCompleted( spovnetId );

		delete replyMsg;

	} else {

		// this is not the first bootstrap, just join the additional node
		logging_debug("not first-time bootstrapping");
		overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
		overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );

		delete replyMsg;

	} // if( overlayInterface == NULL )

	return true;
}


bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
	// get service
	const ServiceID& service = overlayMsg->getService();
	logging_debug( "Received data for service " << service.toString()
		<< " on link " << overlayMsg->getDestinationLink().toString() );

	// delegate data message
	getListener(service)->onMessage(
		overlayMsg,
		overlayMsg->getSourceNode(),
		overlayMsg->getDestinationLink()
	);

	return true;
}


bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {

	if( ld == NULL ) {
		logging_warn( "received overlay update message for link for "
				<< "which we have no mapping" );
		return false;
	}
	logging_info("Received type update message on link " << ld );

	// update our link mapping information for this link
	bool changed =
		( ld->remoteNode != overlayMsg->getSourceNode() )
		|| ( ld->service != overlayMsg->getService() );

	// set parameters
	ld->up         = true;
	ld->remoteNode = overlayMsg->getSourceNode();
	ld->remoteLink = overlayMsg->getSourceLink();
	ld->service    = overlayMsg->getService();
	ld->autolink   = overlayMsg->isAutoLink();

	// if our link information changed, we send out an update, too
	if( changed ) {
		overlayMsg->swapRoles();
		overlayMsg->setSourceNode(nodeId);
		overlayMsg->setSourceLink(ld->overlayId);
		overlayMsg->setService(ld->service);
		send( overlayMsg, ld );
	}

	// service registered? no-> error!
	if( !communicationListeners.contains( ld->service ) ) {
		logging_warn( "Link up: event listener has not been registered" );
		return false;
	}

	// default or no service registered?
	CommunicationListener* listener = communicationListeners.get( ld->service );
	if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
		logging_warn("Link up: event listener is default or null!" );
		return true;
	}

	// update descriptor
	ld->listener = listener;
	ld->setAutoUsed();
	ld->setAlive();

	// ask the service whether it wants to accept this link
	if( !listener->onLinkRequest(ld->remoteNode) ) {

		logging_debug("Link id=" << ld->overlayId.toString() <<
			" has been denied by service " << ld->service.toString() << ", dropping link");

		// prevent onLinkDown calls to the service
		ld->listener = &CommunicationListener::DEFAULT;

		// drop the link
		dropLink( ld->overlayId );
		return true;
	}

	// set link up
	ld->up = true;
	logging_info( "Link has been accepted by service and is up: " << ld );

	// auto links: link has been accepted -> send queued messages
	if( ld->messageQueue.size() > 0 ) {
		logging_info( "Sending out queued messages on link " << ld );
		BOOST_FOREACH( Message* msg, ld->messageQueue ) {
			sendMessage( msg, ld->overlayId );
			delete msg;
		}
		ld->messageQueue.clear();
	}

	// call the notification functions
	listener->onLinkUp( ld->overlayId, ld->remoteNode );
	sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );

	return true;
}

/// handle a link request and reply
bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
	logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );

	//TODO: Check if a request has already been sent using getSourceLink() ...

	// create link descriptor
	LinkDescriptor* ldn = addDescriptor();

	// flags
	ldn->up = true;
	ldn->fromRemote = true;
	ldn->relayed = true;

	// parameters
	ldn->service = overlayMsg->getService();
	ldn->listener = getListener(ldn->service);
	ldn->remoteNode = overlayMsg->getSourceNode();
	ldn->remoteLink = overlayMsg->getSourceLink();

	// update time-stamps
	ldn->setAlive();
	ldn->setAutoUsed();

	// create reply message and send back!
	overlayMsg->swapRoles(); // swap source/destination
	overlayMsg->setType(OverlayMsg::typeLinkReply);
	overlayMsg->setSourceLink(ldn->overlayId);
	overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
	overlayMsg->setRelayed(true);
	send( overlayMsg, ld ); // send back to link

	// inform listener
	ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );

	return true;
}

bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {

	// find link request
	LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());

	// not found? yes-> drop with error!
	if (ldn == NULL) {
		logging_error( "No link request pending for "
			<< overlayMsg->getDestinationLink().toString() );
		return false;
	}
	logging_debug("Handling link reply for " << ldn )

	// check if already up
	if (ldn->up) {
		logging_warn( "Link already up: " << ldn );
		return true;
	}

	// debug message
	logging_debug( "Link request reply received. Establishing link"
		<< " for service " << overlayMsg->getService().toString()
		<< " with local id=" << overlayMsg->getDestinationLink()
		<< " and remote link id=" << overlayMsg->getSourceLink()
		<< " to " << overlayMsg->getSourceEndpoint().toString()
	);

	// set local link descriptor data
	ldn->up = true;
	ldn->relayed = true;
	ldn->service = overlayMsg->getService();
	ldn->listener = getListener(ldn->service);
	ldn->remoteLink = overlayMsg->getSourceLink();
	ldn->remoteNode = overlayMsg->getSourceNode();

	// update timestamps
	ldn->setAlive();
	ldn->setAutoUsed();

	// auto links: link has been accepted -> send queued messages
	if( ldn->messageQueue.size() > 0 ) {
		logging_info( "Sending out queued messages on link " <<
			ldn->overlayId.toString() );
		BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
			sendMessage( msg, ldn->overlayId );
			delete msg;
		}
		ldn->messageQueue.clear();
	}

	// inform listeners about new link
	ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );

	// try to replace relay link with direct link
	ldn->communicationId =
		bc->establishLink( overlayMsg->getSourceEndpoint() );

	return true;
}

/// handle a keep-alive message for a link
bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
	LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
	if ( rld != NULL ) {
		logging_debug("Keep-Alive for " <<
			overlayMsg->getDestinationLink() );
		if (overlayMsg->isRouteRecord())
			rld->routeRecord = overlayMsg->getRouteRecord();
		rld->setAlive();
		return true;
	} else {
		logging_error("Keep-Alive for "
				<< overlayMsg->getDestinationLink() << ": link unknown." );
		return false;
	}
}

/// handle a direct link message
bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
	logging_debug( "Received direct link replacement request" );

	/// get destination overlay link
	LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
	if (rld == NULL || ld == NULL) {
		logging_error("Direct link replacement: Link "
				<< overlayMsg->getDestinationLink() << "not found error." );
		return false;
	}
	logging_info( "Received direct link convert notification for " << rld );

	// update information
	rld->communicationId = ld->communicationId;
	rld->communicationUp = true;
	rld->relayed = false;

	// mark used and alive!
	rld->setAlive();
	rld->setAutoUsed();

	// erase the original descriptor
	eraseDescriptor(ld->overlayId);
}

/// handles an incoming message
bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
	const LinkID bcLink ) {
	logging_debug( "Handling message: " << message->toString());

	// decapsulate overlay message
	OverlayMsg* overlayMsg =
		const_cast<Message*>(message)->decapsulate<OverlayMsg>();
	if( overlayMsg == NULL ) return false;

	// increase number of hops
	overlayMsg->increaseNumHops();

	// refresh relay information
	refreshRelayInformation( overlayMsg, ld );

	// update route record
	overlayMsg->addRouteRecord(nodeId);

	// handle signaling messages (do not route!)
	if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
		overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
		overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
		delete overlayMsg;
		return true;
	}

	// message for reached destination? no-> route message
	if (!overlayMsg->getDestinationNode().isUnspecified() &&
 		 overlayMsg->getDestinationNode() != nodeId ) {
		logging_debug("Routing message "
			<< " from " << overlayMsg->getSourceNode()
			<< " to " << overlayMsg->getDestinationNode()
		);
		route( overlayMsg );
		delete overlayMsg;
		return true;
	}

	// handle base overlay message
	bool ret = false; // return value
	switch ( overlayMsg->getType() ) {

		// data transport messages
		case OverlayMsg::typeData:
			ret = handleData(overlayMsg, ld); 			break;

		// overlay setup messages
		case OverlayMsg::typeJoinRequest:
			ret = handleJoinRequest(overlayMsg, bcLink ); 	break;
		case OverlayMsg::typeJoinReply:
			ret = handleJoinReply(overlayMsg, bcLink ); 	break;

		// link specific messages
		case OverlayMsg::typeLinkRequest:
			ret = handleLinkRequest(overlayMsg, ld ); 	break;
		case OverlayMsg::typeLinkReply:
			ret = handleLinkReply(overlayMsg, ld ); 	break;
		case OverlayMsg::typeLinkUpdate:
			ret = handleLinkUpdate(overlayMsg, ld );  	break;
		case OverlayMsg::typeLinkAlive:
			ret = handleLinkAlive(overlayMsg, ld );   	break;
		case OverlayMsg::typeLinkDirect:
			ret = handleLinkDirect(overlayMsg, ld );  	break;

		// handle unknown message type
		default: {
			logging_error( "received message in invalid state! don't know " <<
				"what to do with this message of type " << overlayMsg->getType() );
			ret = false;
			break;
		}
	}

	// free overlay message and return value
	delete overlayMsg;
	return ret;
}

// ----------------------------------------------------------------------------

void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {

	logging_debug( "broadcasting message to all known nodes " <<
			"in the overlay from service " + service.toString() );

	OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
	OverlayInterface::NodeList::iterator i = nodes.begin();
	for(; i != nodes.end(); i++ ) {
		if( *i == nodeId) continue; // don't send to ourselfs
		sendMessage( message, *i, service );
	}
}

/// return the overlay neighbors
vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
	// the known nodes _can_ also include our node, so we remove ourself
	vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
	vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
	if( i != nodes.end() ) nodes.erase( i );
	return nodes;
}

const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
	if( lid == LinkID::UNSPECIFIED ) return nodeId;
	const LinkDescriptor* ld = getDescriptor(lid);
	if( ld == NULL ) return NodeID::UNSPECIFIED;
	else return ld->remoteNode;
}

vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
	vector<LinkID> linkvector;
	BOOST_FOREACH( LinkDescriptor* ld, links ) {
		if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
			linkvector.push_back( ld->overlayId );
		}
	}
	return linkvector;
}


void BaseOverlay::onNodeJoin(const NodeID& node) {
	JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
	if( i == joiningNodes.end() ) return;

	logging_info( "node has successfully joined baseoverlay and overlay structure "
			<< node.toString() );

	joiningNodes.erase( i );
}

void BaseOverlay::eventFunction() {
	stabilizeRelays();
	stabilizeLinks();
}

}} // namespace ariba, overlay
