// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "BaseCommunication.h" #include "networkinfo/AddressDiscovery.h" #include "ariba/utility/types/PeerID.h" #include "ariba/utility/system/SystemQueue.h" #include #ifdef UNDERLAY_OMNET #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h" #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h" #include "ariba/utility/system/StartupWrapper.h" using ariba::communication::AribaOmnetModule; using ariba::communication::OmnetNetworkProtocol; using ariba::utility::StartupWrapper; #endif namespace ariba { namespace communication { using namespace ariba::addressing2; using ariba::utility::PeerID; using ariba::utility::SystemQueue; use_logging_cpp(BaseCommunication); BaseCommunication::BaseCommunication() : listenOn_endpoints(new addressing2::endpoint_set()), currentSeqnum( 0 ), transport( NULL ), messageReceiver( NULL ), started( false ) { } BaseCommunication::~BaseCommunication(){ } void BaseCommunication::start(EndpointSetPtr listen_on) { assert ( ! started ); listenOn_endpoints = listen_on; logging_info("Setting local end-points: " << listenOn_endpoints->to_string()); logging_info( "Starting up ..." ); currentSeqnum = 0; // creating transports // ---> transport_peer holds the set of the active endpoints we're listening on logging_info( "Creating transports ..." ); transport = new transport_peer(); active_listenOn_endpoints = transport->add_listenOn_endpoints(listenOn_endpoints); logging_info( "XXX. Active endpoints = " << active_listenOn_endpoints->to_string() ); // XXX logging_info( "Searching for local locators ..." ); local_endpoints = AddressDiscovery::discover_endpoints(active_listenOn_endpoints); if ( local_endpoints->count() > 0 ) { logging_info( "Done. Discovered local endpoints: " << local_endpoints->to_string() ); } else { logging_warn("WARING!! No local endpoints found, NO COMMUNICATION POSSIBLE!!"); // TODO notify application, so that it may react properly. throw exception..? assert( false ); } // create local EndpointDescriptor // ---> localDescriptor hold the set endpoints that can be used to reach us localDescriptor.getPeerId() = PeerID::random(); localDescriptor.replace_endpoint_set(local_endpoints); logging_info( "Using PeerID: " << localDescriptor.getPeerId() ); // start transport_peer transport->register_listener( this ); transport->start(); // bind to the network change detection networkMonitor.registerNotification( this ); // base comm startup done started = true; logging_info( "Started up." ); } void BaseCommunication::stop() { logging_info( "Stopping transports ..." ); transport->stop(); delete transport; started = false; logging_info( "Stopped." ); } bool BaseCommunication::isStarted(){ return started; } const LinkID BaseCommunication::establishLink( const EndpointDescriptor& descriptor, const LinkID& link_id, const QoSParameterSet& qos, const SecurityParameterSet& sec) { // copy link id LinkID linkid = link_id; // debug logging_debug( "Request to establish link" ); // create link identifier if (linkid.isUnspecified()) linkid = LinkID::create(); // create link descriptor logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() ); LinkDescriptor* ld = new LinkDescriptor(); ld->localLink = linkid; addLink( ld ); /* send a message to request new link to remote */ logging_debug( "Send messages with request to open link to " << descriptor.toString() ); /* * Create Link-Request Message: * NOTE: - Their PeerID (in parent message) * - Our LinkID * - Our PeerID * - Our EndpointDescriptor */ reboost::message_t linkmsg; linkmsg.push_back(linkid.serialize()); linkmsg.push_back(localDescriptor.getPeerId().serialize()); linkmsg.push_back(localDescriptor.endpoints->serialize()); // // XXX AKTUELL BUG FINDING... // reboost::shared_buffer_t xxx = localDescriptor.endpoints->serialize(); // EndpointSetPtr xxx_set = endpoint_set::create_EndpointSet(); // xxx_set->deserialize(xxx); // cout << "/// MARIO VORHER: " << localDescriptor.endpoints->to_string() << endl; // cout << "/// MARIO NACHHER: " << xxx_set->to_string() << endl; // send message // TODO move enum to BaseComm send_to_peer(AribaBaseMsg::typeLinkRequest, descriptor.getPeerId(), linkmsg, descriptor, system_priority::OVERLAY); return linkid; } void BaseCommunication::dropLink(const LinkID link) { logging_debug( "Starting to drop link " + link.toString() ); // see if we have the link LinkDescriptor& ld = queryLocalLink( link ); if( ld.isUnspecified() ) { logging_error( "Don't know the link you want to drop "+ link.toString() ); return; } // tell the registered listeners foreach( CommunicationEvents* i, eventListener ) { i->onLinkDown( link, ld.localLocator, ld.remoteLocator ); } // * send message to drop the link * logging_debug( "Sending out link close request. for us, the link is closed now" ); reboost::message_t empty_message; send_over_link( AribaBaseMsg::typeLinkClose, empty_message, ld, system_priority::OVERLAY ); // remove from map removeLink(link); } seqnum_t BaseCommunication::sendMessage( const LinkID& lid, reboost::message_t message, uint8_t priority, bool bypass_overlay) throw(communication_message_not_sent) { // message type: direct data or (normal) data AribaBaseMsg::type_ type; if ( bypass_overlay ) { type = AribaBaseMsg::typeDirectData; logging_debug( "Sending out direct-message to link " << lid.toString() ); } else { type = AribaBaseMsg::typeData; logging_debug( "Sending out message to link " << lid.toString() ); } // query local link info LinkDescriptor& ld = queryLocalLink(lid); if( ld.isUnspecified() ) { throw communication_message_not_sent("Don't know the link with id " + lid.toString()); } // link not up-> error if( !ld.up ) { throw communication_message_not_sent("Can not send on link " + lid.toString() + ": link not up"); } // * send message * bool okay = send_over_link( type, message, ld, priority ); if ( ! okay ) { throw communication_message_not_sent("send_over_link failed!"); } return ++currentSeqnum; } const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const { if( link.isUnspecified() ){ return localDescriptor; } else { LinkDescriptor& linkDesc = queryLocalLink(link); if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); return linkDesc.remoteDescriptor; } } void BaseCommunication::registerEventListener(CommunicationEvents* _events){ if( eventListener.find( _events ) == eventListener.end() ) eventListener.insert( _events ); } void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){ EventListenerSet::iterator i = eventListener.find( _events ); if( i != eventListener.end() ) eventListener.erase( i ); } /*------------------------------ | ASIO thread --> SystemQueue | ------------------------------*/ /// ASIO thread void BaseCommunication::receive_message(transport_connection::sptr connection, reboost::shared_buffer_t msg) { logging_debug( "Dispatching message" ); SystemQueue::instance().scheduleCall( boost::bind( &BaseCommunication::receiveMessage, this, connection, msg) ); } /// ASIO thread void BaseCommunication::connection_terminated(transport_connection::sptr connection) { SystemQueue::instance().scheduleCall( boost::bind( &BaseCommunication::connectionTerminated, this, connection) ); } /*-------------------------------- | [ASIO thread --> SystemQueue] | -------------------------------*/ /// ARIBA thread (System Queue) void BaseCommunication::connectionTerminated(transport_connection::sptr connection) { vector links = connection->get_communication_links(); logging_debug("[BaseCommunication] Connection terminated: " << connection->getLocalEndpoint()->to_string() << " <--> " << connection->getRemoteEndpoint()->to_string() << " (" << links.size() << " links)"); // remove all links that used the terminated connection for ( vector::iterator it = links.begin(); it != links.end(); ++it ) { LinkID& link_id = **it; logging_debug(" ---> Removing link: " << link_id.toString()); // searching for link, not found-> warn LinkDescriptor& linkDesc = queryLocalLink( link_id ); if (linkDesc.isUnspecified()) { logging_warn("Failed to find local link " << link_id.toString()); continue; } // inform listeners foreach( CommunicationEvents* i, eventListener ){ i->onLinkFail( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator ); } // remove the link descriptor removeLink( link_id ); } } /// ARIBA thread (System Queue) void BaseCommunication::receiveMessage(transport_connection::sptr connection, reboost::shared_buffer_t message) { // XXX logging_debug("/// [receiveMessage] buffersize: " << message.size()); // get type uint8_t type = message.data()[0]; reboost::shared_buffer_t sub_buff = message(1); // get link id LinkID link_id; if ( type != AribaBaseMsg::typeLinkRequest) { sub_buff = link_id.deserialize(sub_buff); } // handle message switch ( type ) { // --------------------------------------------------------------------- // data message // --------------------------------------------------------------------- case AribaBaseMsg::typeData: { logging_debug( "Received data message, forwarding to overlay." ); if( messageReceiver != NULL ) { messageReceiver->receiveMessage( sub_buff, link_id, NodeID::UNSPECIFIED, false ); } break; } // --------------------------------------------------------------------- // direct data message (bypass overlay-layer) // --------------------------------------------------------------------- case AribaBaseMsg::typeDirectData: { logging_debug( "Received direct data message, forwarding to application." ); if( messageReceiver != NULL ) { messageReceiver->receiveMessage( sub_buff, link_id, NodeID::UNSPECIFIED, true ); } break; } // --------------------------------------------------------------------- // handle link request from remote // --------------------------------------------------------------------- case AribaBaseMsg::typeLinkRequest: { logging_debug( "Received link open request on " << connection->getLocalEndpoint()->to_string() ); /* * Deserialize Peer Message * - Our PeerID */ PeerID our_peer_id; sub_buff = our_peer_id.deserialize(sub_buff); /// not the correct peer id-> skip request if ( our_peer_id != localDescriptor.getPeerId() && ! our_peer_id.isUnspecified() /* overlay bootstrap */ ) { logging_info("Received link request for " << our_peer_id.toString() << "but i'm " << localDescriptor.getPeerId() << ": Ignoring!"); // TODO terminate connection? break; } /* * Deserialize Link-Request Message: * - Their LinkID * - Their PeerID * - Their EndpointDescriptor */ LinkID their_link_id; PeerID their_peer_id; EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); sub_buff = their_link_id.deserialize(sub_buff); sub_buff = their_peer_id.deserialize(sub_buff); sub_buff = their_endpoints->deserialize(sub_buff); /* [ Deserialize Link-Request Message ] */ /// only answer the first request if (!queryRemoteLink(their_link_id).isUnspecified()) { // TODO aktuell: When will these connections be closed? // ---> Close it now (if it services no links) ? // (see also ! allowlink below) // XXX AKTUELL TESTING !! This will cause race conditions. So this is test-code only! if ( connection->get_communication_links().size() == 0 ) { connection->terminate(); } logging_debug("Link request already received. Ignore!"); break; } /// create link ids LinkID localLink = LinkID::create(); LinkID remoteLink = their_link_id; // XXX intermediate variable is unnecessary logging_debug( "local=" << connection->getLocalEndpoint()->to_string() << " remote=" << connection->getRemoteEndpoint()->to_string() ); // check if link creation is allowed by ALL listeners bool allowlink = true; foreach( CommunicationEvents* i, eventListener ){ allowlink &= i->onLinkRequest( localLink, connection->getLocalEndpoint(), connection->getRemoteEndpoint()); } // not allowed-> warn if( !allowlink ){ logging_warn( "Overlay denied creation of link" ); return; } // create descriptor LinkDescriptor* ld = new LinkDescriptor(); ld->localLink = localLink; ld->remoteLink = remoteLink; ld->localLocator = connection->getLocalEndpoint(); ld->remoteLocator = connection->getRemoteEndpoint(); ld->remoteDescriptor = EndpointDescriptor(their_peer_id, their_endpoints); ld->set_connection(connection); // update endpoints (should only have any effect in case of NAT) ld->remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); // localDescriptor.endpoints->add_endpoint(connection->getLocalEndpoint()); // XXX 0.0.0.0:0 // link is now up-> add it ld->up = true; addLink(ld); /* sending link reply */ logging_debug( "Sending link reply with ids " << "local=" << localLink.toString() << ", " << "remote=" << remoteLink.toString() ); /* * Create Link-Reply Message: * - Our LinkID * - Our Endpoint_Set (as update) * - Their EndpointDescriptor (maybe they learn something about NAT) */ reboost::message_t linkmsg; linkmsg.push_back(localLink.serialize()); linkmsg.push_back(localDescriptor.endpoints->serialize()); linkmsg.push_back(ld->remoteDescriptor.endpoints->serialize()); // XXX cout << "/// MARIO: " << ld->get_connection()->getRemoteEndpoint()->to_string() << endl; // send message bool sent = send_over_link( AribaBaseMsg::typeLinkReply, linkmsg, *ld, system_priority::OVERLAY ); if ( ! sent ) { logging_error("ERROR: Could not send LinkReply to: " << ld->remoteLocator->to_string()); // TODO remove link, close link, ..? break; } // link is up! logging_debug( "Link (initiated from remote) is up with " << "local(id=" << ld->localLink.toString() << "," << "locator=" << ld->localLocator->to_string() << ") " << "remote(id=" << ld->remoteLink.toString() << ", " << "locator=" << ld->remoteLocator->to_string() << ")" ); // inform listeners about new open link foreach( CommunicationEvents* i, eventListener ) { i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator); } // done break; } // --------------------------------------------------------------------- // handle link request reply // --------------------------------------------------------------------- case AribaBaseMsg::typeLinkReply: { logging_debug( "Received link open reply for a link we initiated" ); /* * Deserialize Link-Reply Message: * - Their LinkID * - Their Endpoint_Set (as update) * - Our EndpointDescriptor (maybe we can learn something about NAT) */ LinkID their_link_id; EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); EndpointSetPtr our_endpoints = endpoint_set::create_EndpointSet(); sub_buff = their_link_id.deserialize(sub_buff); sub_buff = their_endpoints->deserialize(sub_buff); sub_buff = our_endpoints->deserialize(sub_buff); // this is a reply to a link open request, so we have already // a link mapping and can now set the remote link to valid LinkDescriptor& ld = queryLocalLink( link_id ); // no link found-> warn! if (ld.isUnspecified()) { logging_warn("Failed to find local link " << link_id.toString()); return; } if ( ld.up ) { logging_warn("Got link replay for already open link. Ignore. LinkID: " << link_id.toString()); // TODO send LinkClose ? return; } // store the connection ld.set_connection(connection); // set remote locator and link id ld.remoteLink = their_link_id; ld.remoteLocator = connection->getRemoteEndpoint(); /* Update endpoints */ // NOTE: we might loose some information here, but it's our only chance to get rid of outdated information. ld.remoteDescriptor.replace_endpoint_set(their_endpoints); // add actual remote endpoint to this set (should only have any effect in case of NAT) ld.remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); // TODO In case of NAT, we could learn something about our external IP. // ---> But we must trust the remote peer about this information!! // localDescriptor.endpoints->add_endpoints(our_endpoints); ld.up = true; logging_debug( "Link is now up with local id " << ld.localLink.toString() << " and remote id " << ld.remoteLink.toString() ); // inform lisneters about link up event foreach( CommunicationEvents* i, eventListener ){ i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator ); } // done break; } // --------------------------------------------------------------------- // handle link close requests // --------------------------------------------------------------------- case AribaBaseMsg::typeLinkClose: { // get remote link // const LinkID& localLink = msg.getRemoteLink(); logging_debug( "Received link close request for link " << link_id.toString() ); // searching for link, not found-> warn LinkDescriptor& linkDesc = queryLocalLink( link_id ); if (linkDesc.isUnspecified()) { logging_warn("Failed to find local link " << link_id.toString()); return; } // inform listeners foreach( CommunicationEvents* i, eventListener ){ i->onLinkDown( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator ); } // remove the link descriptor removeLink( link_id ); // done break; } // --------------------------------------------------------------------- // handle link locator changes -- TODO is this ever called..? // --------------------------------------------------------------------- // case AribaBaseMsg::typeLinkUpdate: { // const LinkID& localLink = msg.getRemoteLink(); // logging_debug( "Received link update for link " // << localLink.toString() ); // // // find the link description // LinkDescriptor& linkDesc = queryLocalLink( localLink ); // if (linkDesc.isUnspecified()) { // logging_warn("Failed to update local link " // << localLink.toString()); // return; // } // // // update the remote locator // addressing2::EndpointPtr oldremote = linkDesc.remoteLocator; // linkDesc.remoteLocator = connection->getRemoteEndpoint(); // // // TODO update linkDesc.connection ? // // // inform the listeners (local link has _not_ changed!) // foreach( CommunicationEvents* i, eventListener ){ // i->onLinkChanged( // linkDesc.localLink, // linkid // linkDesc.localLocator, // old local // linkDesc.localLocator, // new local // oldremote, // old remote // linkDesc.remoteLocator // new remote // ); // } // // // done // break; // } default: { logging_warn( "Received unknown message type!" ); break; } } } /// add a newly allocated link to the set of links void BaseCommunication::addLink( LinkDescriptor* link ) { linkSet.push_back( link ); } /// remove a link from set void BaseCommunication::removeLink( const LinkID& localLink ) { for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){ if( (*i)->localLink != localLink) continue; // remove_endpoint((*i)->remoteLocator); // XXX delete *i; linkSet.erase( i ); break; } } /// query a descriptor by local link id BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const { for (size_t i=0; ilocalLink == link) return (LinkDescriptor&)*linkSet[i]; return LinkDescriptor::UNSPECIFIED(); } /// query a descriptor by remote link id BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const { for (size_t i=0; iremoteLink == link) return (LinkDescriptor&)*linkSet[i]; return LinkDescriptor::UNSPECIFIED(); } //LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const { // LinkIDs ids; // for (size_t i=0; ilocalLink ); // } else { // if ( *linkSet[i]->remoteLocator == *addr ) // ids.push_back( linkSet[i]->localLink ); // } // } // return ids; //} void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){ /*- disabled! // we only care about address changes, not about interface changes // as address changes are triggered by interface changes, we are safe here if( info.type != NetworkChangeInterface::EventTypeAddressNew && info.type != NetworkChangeInterface::EventTypeAddressDelete ) return; logging_info( "base communication is handling network address changes" ); // get all now available addresses NetworkInformation networkInformation; AddressInformation addressInformation; NetworkInterfaceList interfaces = networkInformation.getInterfaces(); AddressList addresses; for( NetworkInterfaceList::iterator i = interfaces.begin(); i != interfaces.end(); i++ ){ AddressList newaddr = addressInformation.getAddresses(*i); addresses.insert( addresses.end(), newaddr.begin(), newaddr.end() ); } // // get current locators for the local endpoint // TODO: this code is dublicate of the ctor code!!! cleanup! // NetworkProtocol::NetworkLocatorSet locators = network->getAddresses(); NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin(); NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end(); // // remember the old local endpoint, in case it changes // EndpointDescriptor oldLocalDescriptor( localDescriptor ); // // look for local locators that we can use in communication // // choose the first locator that is not localhost // bool foundLocator = false; bool changedLocator = false; for( ; i != iend; i++){ logging_debug( "local locator found " << (*i)->toString() ); IPv4Locator* ipv4locator = dynamic_cast(*i); if( *ipv4locator != IPv4Locator::LOCALHOST && *ipv4locator != IPv4Locator::ANY && *ipv4locator != IPv4Locator::BROADCAST ){ ipv4locator->setPort( listenport ); changedLocator = *localDescriptor.locator != *ipv4locator; localDescriptor.locator = ipv4locator; logging_info( "binding to addr = " << ipv4locator->toString() ); foundLocator = true; break; } } // for( ; i != iend; i++) // // if we found no locator, bind to localhost // if( !foundLocator ){ changedLocator = *localDescriptor.locator != IPv4Locator::LOCALHOST; localDescriptor.locator = new IPv4Locator( IPv4Locator::LOCALHOST ); ((IPv4Locator*)(localDescriptor.locator))->setPort( listenport ); logging_info( "found no good local lcoator, binding to addr = " << localDescriptor.locator->toString() ); } // // if we have connections that have no more longer endpoints // close these. they will be automatically built up again. // also update the local locator in the linkset mapping // if( changedLocator ){ logging_debug( "local endp locator has changed to " << localDescriptor.toString() << ", resettings connections that end at old locator " << oldLocalDescriptor.toString()); LinkSet::iterator i = linkSet.begin(); LinkSet::iterator iend = linkSet.end(); for( ; i != iend; i++ ){ logging_debug( "checking connection for locator change: " << " local " << (*i).localLocator->toString() << " old " << oldLocalDescriptor.locator->toString() ); if( *((*i).localLocator) == *(oldLocalDescriptor.locator) ){ logging_debug("terminating connection to " << (*i).remoteLocator->toString() ); transport->terminate( oldLocalDescriptor.locator, (*i).remoteLocator ); (*i).localLocator = localDescriptor.locator; } } // for( ; i != iend; i++ ) // wait 500ms to give the sockets time to shut down usleep( 500000 ); } else { logging_debug( "locator has not changed, not resetting connections" ); } // // handle the connections that have no longer any // valid locator. send update messages with the new // locator, so the remote node updates its locator/link mapping // LinkSet::iterator iAffected = linkSet.begin(); LinkSet::iterator endAffected = linkSet.end(); for( ; iAffected != endAffected; iAffected++ ){ LinkDescriptor descr = *iAffected; logging_debug( "sending out link locator update to " << descr.remoteLocator->toString() ); AribaBaseMsg updateMsg( descr.remoteLocator, AribaBaseMsg::LINK_STATE_UPDATE, descr.localLink, descr.remoteLink ); transport->sendMessage( &updateMsg ); } */ } addressing2::EndpointPtr BaseCommunication::get_local_endpoint_of_link( const LinkID& linkid) { LinkDescriptor& ld = queryLocalLink(linkid); return ld.get_connection()->getLocalEndpoint(); } addressing2::EndpointPtr BaseCommunication::get_remote_endpoint_of_link( const LinkID& linkid) { LinkDescriptor& ld = queryLocalLink(linkid); return ld.get_connection()->getRemoteEndpoint(); } bool BaseCommunication::send_over_link( const uint8_t type, reboost::message_t message, const LinkDescriptor& desc, const uint8_t priority) { /* * Create Link Message: * - Type * - Their LinkID */ // link id message.push_front(desc.remoteLink.serialize()); // type memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); /* [ Create Link Message ] */ /* send message */ transport_connection::sptr conn = desc.get_connection(); if ( ! conn ) { cout << "/// MARIO: No connection!!" << endl; // XXX debug return false; } // * send over connection * return conn->send(message, priority); } void BaseCommunication::send_to_peer( const uint8_t type, const PeerID& peer_id, reboost::message_t message, const EndpointDescriptor& endpoint, const uint8_t priority ) { /* * Create Peer Message: * - Type * - Their PeerID */ // peer id message.push_front(peer_id.serialize()); // type memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); /* send message */ transport->send(endpoint.getEndpoints(), message, priority); } }} // namespace ariba, communication