// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [License] #include "BaseCommunication.h" #ifdef UNDERLAY_OMNET #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h" #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h" #include "ariba/utility/system/StartupWrapper.h" using ariba::communication::AribaOmnetModule; using ariba::communication::OmnetNetworkProtocol; using ariba::utility::StartupWrapper; #endif namespace ariba { namespace communication { use_logging_cpp(BaseCommunication); const BaseCommunication::LinkDescriptor BaseCommunication::LinkDescriptor::UNSPECIFIED; BaseCommunication::BaseCommunication() : messageReceiver(NULL), network(NULL), transport(NULL), basecommStarted(false){ } BaseCommunication::~BaseCommunication(){ } void BaseCommunication::start(const NetworkLocator* _locallocator, const uint16_t _listenport){ currentSeqnum = 0; listenport = _listenport; logging_info( "starting up base communication and creating transports ..." ); logging_info( "using port " << listenport ); #ifdef UNDERLAY_OMNET AribaOmnetModule* module = StartupWrapper::getCurrentModule(); module->setServerPort( listenport ); transport = module; network = new OmnetNetworkProtocol( module ); #else transport = new TCPTransport( listenport ); network = new IPv4NetworkProtocol(); #endif logging_debug( "searching for local locators ..." ); NetworkProtocol::NetworkLocatorSet locators = network->getAddresses(); NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin(); NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end(); // // choose the first locator that is not localhost // bool foundLocator = false; for( ; i != iend; i++){ logging_debug( "local locator found " << (*i)->toString() ); IPv4Locator* ipv4locator = dynamic_cast(*i); // TODO: which locators are find to bind to? // localhost is not too bad, works when testing locally // with several instances. the manual override currently // enables to use an arbitrary address, guess this is fine. // so the manual override also can use ANY, LOCALHOST, BROADCAST if( *ipv4locator != IPv4Locator::LOCALHOST && *ipv4locator != IPv4Locator::ANY && *ipv4locator != IPv4Locator::BROADCAST ){ ipv4locator->setPort(listenport); localDescriptor.locator = ipv4locator; localDescriptor.isUnspec = false; logging_info( "binding to addr = " << ipv4locator->toString() ); foundLocator = true; break; } } // for( ; i != iend; i++) if( _locallocator != NULL ) { if( localDescriptor.locator != NULL) delete localDescriptor.locator; localDescriptor.locator = new IPv4Locator( IPv4Locator::fromString( _locallocator->toString()) ); localDescriptor.isUnspec = false; logging_debug( "manual locator override, using locator=" << localDescriptor.locator->toString() ); foundLocator = true; } // if we found no local locator, exit using logging fatal if( !foundLocator ) logging_fatal( "did not find a useable local locator!" ); transport->addMessageReceiver( this ); transport->start(); #ifndef UNDERLAY_OMNET // // bind to the network change detection // networkMonitor.registerNotification( this ); #endif // // base comm startup done // basecommStarted = true; logging_info( "base communication started up" ); } void BaseCommunication::stop() { logging_info( "stopping base communication and transport ..." ); transport->stop(); delete transport; delete network; basecommStarted = false; logging_info( "base communication stopped" ); } bool BaseCommunication::isStarted(){ return basecommStarted; } const LinkID BaseCommunication::establishLink( const EndpointDescriptor& descriptor, const LinkID& link_id, const QoSParameterSet& qos, const SecurityParameterSet& sec) { // copy link id LinkID linkid = link_id; // debug logging_debug( "request to establish link" ); // // just use the first locator in the endp descriptors // if( descriptor.locator == NULL ){ logging_error( "invalid destination endpoint" ); return LinkID::UNSPECIFIED; } if( localDescriptor.locator == NULL ){ logging_error( "invalid local endpoint" ); return LinkID::UNSPECIFIED; } const NetworkLocator* remote = descriptor.locator; const NetworkLocator* local = localDescriptor.locator; // create link identifier and link descriptor if (linkid.isUnspecified()){ linkid = LinkID::create(); assert(!linkid.isUnspecified()); } logging_debug( "creating new local descriptor entry with local link id " << linkid.toString() ); LinkDescriptor linkDescriptor( linkid, local, LinkID::UNSPECIFIED, remote, descriptor, false ); addLink( linkDescriptor ); // // create a base msg with our link id and // a request to open a link on the other side // logging_debug( "sending out base messages with request to open link to " << remote->toString() ); AribaBaseMsg baseMsg( remote, AribaBaseMsg::LINK_STATE_OPEN_REQUEST, linkid, LinkID::UNSPECIFIED ); transport->sendMessage(&baseMsg); return linkid; } void BaseCommunication::dropLink(const LinkID link) { logging_debug( "starting to drop link " + link.toString() ); // see if we have the link LinkDescriptor& descriptor = queryLocalLink( link ); if( descriptor.isUnspecified() ){ logging_error( "don't know the link you want to drop "+ link.toString() ); return; } // create message to drop the link logging_debug( "sending out link close request. for us, the link is closed now" ); AribaBaseMsg msg( descriptor.remoteLocator, AribaBaseMsg::LINK_STATE_CLOSE_REQUEST, descriptor.localLink, descriptor.remoteLink ); // send message to drop the link transport->sendMessage( &msg ); // tell the registered listeners BOOST_FOREACH( CommunicationEvents* i, eventListener ){ i->onLinkDown( link, descriptor.localLocator, descriptor.remoteLocator ); } // remove from map removeLink(link); } seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) { logging_debug( "sending out message to link " << lid.toString() ); // query local link info LinkDescriptor& linkDesc = queryLocalLink(lid); if( linkDesc.isUnspecified() ){ logging_error( "don't know the link with id " << lid.toString() ); return -1; } // create message AribaBaseMsg msg( linkDesc.remoteLocator, AribaBaseMsg::LINK_STATE_DATA, linkDesc.localLink, linkDesc.remoteLink ); // encapsulate the payload message msg.encapsulate( const_cast(message) ); if( !linkDesc.linkup ){ logging_error("cant send message on link " << lid.toString() << ", link not up"); return -1; } // send message transport->sendMessage( &msg ); return ++currentSeqnum; } const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const { if( link == LinkID::UNSPECIFIED){ return localDescriptor; } else { LinkDescriptor& linkDesc = queryLocalLink(link); if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED; return linkDesc.remoteEndpoint; } } void BaseCommunication::registerMessageReceiver(MessageReceiver* _receiver) { messageReceiver = _receiver; } void BaseCommunication::unregisterMessageReceiver(MessageReceiver* _receiver) { messageReceiver = NULL; } void BaseCommunication::registerEventListener(CommunicationEvents* _events){ if( eventListener.find( _events ) == eventListener.end() ) eventListener.insert( _events ); } void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){ EventListenerSet::iterator i = eventListener.find( _events ); if( i != eventListener.end() ) eventListener.erase( i ); } bool BaseCommunication::receiveMessage(const Message* message, const LinkID& /*invalid*/, const NodeID& ){ // // these messages arrive from the Transport module // and are incoming network messages. Unpack the // AribaBaseMsg and handle control packets, // deliver data packets to the overlay // AribaBaseMsg* spovmsg = ((Message*)message)->decapsulate(); logging_debug( "receiving base comm message of type " << spovmsg->getTypeString() ); // // deliver data to the overlays. we just give the // inner packet to every registered overlay ... // if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_DATA ){ logging_debug( "received data message, forwarding to overlay" ); // // put the linkid as address into the message // and sent it to the receiver // if( messageReceiver != NULL ) { messageReceiver->receiveMessage( spovmsg, spovmsg->getRemoteLink(), NodeID::UNSPECIFIED ); } } // LINK_STATE_DATA // // handle link open requests // else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_OPEN_REQUEST ){ logging_debug( "received link open request" ); // // create a link context // // in an incoming packet the localLink is from // the sender perspective local and from our // perspective remote logging_debug( "creating local link" ); LinkID localLink = LinkID::create(); LinkID remoteLink = spovmsg->getLocalLink(); if(localLink.isUnspecified()){ logging_error("local link is unspecified"); return false; } if(remoteLink.isUnspecified()){ logging_error("remote link is unspecified"); return false; } const NetworkLocator* localLocator = dynamic_cast(localDescriptor.locator); const NetworkLocator* remoteLocator = dynamic_cast(message->getSourceAddress()); logging_debug( "localLocator=" << localLocator->toString() << " remoteLocator=" << remoteLocator->toString()); // ask the registered listeners if this link // creation is fine. we will only allow the // link if all of them agree bool allowlink = true; BOOST_FOREACH( CommunicationEvents* i, eventListener ){ allowlink &= i->onLinkRequest( localLink, localLocator, remoteLocator ); } if( !allowlink ){ logging_warn( "overlay denied creation of link" ); return true; } // // create and save the descriptor for the link // LinkDescriptor linkDescriptor(localLink, localLocator, remoteLink, remoteLocator, EndpointDescriptor(remoteLocator), true); logging_debug( "saving new link descriptor with " << "[local link " << localLink.toString() << "] " << "[local locator " << localLocator->toString() << "] " << "[remote link " << remoteLink.toString() << "] " << "[remote locator " << remoteLocator->toString() << "]" << "[link up true]" ); addLink( linkDescriptor ); // // send out a link reply // logging_debug( "sending back link open reply for " << "[local link " << localLink.toString() << "] " << "[remote link " << remoteLink.toString() << "]" ); AribaBaseMsg reply(remoteLocator, AribaBaseMsg::LINK_STATE_OPEN_REPLY, localLink, remoteLink); transport->sendMessage( &reply ); // // the link is now open // BOOST_FOREACH( CommunicationEvents* i, eventListener ){ i->onLinkUp( localLink, localLocator, remoteLocator ); } } // LINK_STATE_OPEN_REQUEST // // handle link open replies // else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_OPEN_REPLY ){ logging_debug( "received link open reply for a link we initiated" ); // this is a reply to a link open request, so we have already // a link mapping and can now set the remote link to valid LinkDescriptor& linkDesc = queryLocalLink( spovmsg->getRemoteLink() ); if (linkDesc.isUnspecified()) { logging_warn("failed to find local link " << spovmsg->getRemoteLink().toString()); return false; } linkDesc.remoteLink = spovmsg->getLocalLink(); linkDesc.linkup = true; logging_debug( "the link is now up with local link id " << linkDesc.localLink.toString() << " and remote link id " << linkDesc.remoteLink.toString() ); // notify the baseoverlay that the link is up, so // it can exchange nodeids over this link. then we // can send the queued messages, as both nodes have // to know their nodeids first BOOST_FOREACH( CommunicationEvents* i, eventListener ){ i->onLinkUp( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator ); } } // LINK_STATE_OPEN_REPLY // // handle link close requests // else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_CLOSE_REQUEST ){ const LinkID& localLink = spovmsg->getRemoteLink(); logging_debug( "received link close request for link " << localLink.toString() ); // // the link is closed immediately, we // don't need to send out a reply, so we // delete the mapping and inform // LinkDescriptor& linkDesc = queryLocalLink( localLink ); if (linkDesc.isUnspecified()) { logging_warn("Failed to find local link " << localLink.toString()); return false; } BOOST_FOREACH( CommunicationEvents* i, eventListener ){ i->onLinkDown( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator ); } // // remove the link descriptor // removeLink( localLink ); } // LINK_STATE_CLOSE_REQUEST // // handle locator updates // else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_UPDATE ){ const LinkID& localLink = spovmsg->getRemoteLink(); logging_debug( "received link update for link " << localLink.toString() ); // // find the link description // LinkDescriptor& linkDesc = queryLocalLink( localLink ); if (linkDesc.isUnspecified()) { logging_warn("Failed to update local link " << localLink.toString()); return false; } // // update the remote locator // const NetworkLocator* oldremote = linkDesc.remoteLocator; linkDesc.remoteLocator = dynamic_cast(message->getSourceAddress()); // // inform the listeners (local link has _not_ changed!) // BOOST_FOREACH( CommunicationEvents* i, eventListener ){ i->onLinkChanged( linkDesc.localLink, // linkid linkDesc.localLocator, // old local linkDesc.localLocator, // new local oldremote, // old remote linkDesc.remoteLocator // new remote ); } } // LINK_STATE_UPDATE return true; } void BaseCommunication::addLink( const LinkDescriptor& link ) { linkSet.push_back( link ); } void BaseCommunication::removeLink( const LinkID& localLink ) { LinkSet::iterator i = linkSet.begin(); LinkSet::iterator iend = linkSet.end(); for( ; i != iend; i++){ if( (*i).localLink != localLink) continue; linkSet.erase( i ); break; } } BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const { for (int i=0; igetAddresses(); NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin(); NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end(); // // remember the old local endpoint, in case it changes // EndpointDescriptor oldLocalDescriptor( localDescriptor ); // // look for local locators that we can use in communication // // choose the first locator that is not localhost // bool foundLocator = false; bool changedLocator = false; for( ; i != iend; i++){ logging_debug( "local locator found " << (*i)->toString() ); IPv4Locator* ipv4locator = dynamic_cast(*i); if( *ipv4locator != IPv4Locator::LOCALHOST && *ipv4locator != IPv4Locator::ANY && *ipv4locator != IPv4Locator::BROADCAST ){ ipv4locator->setPort( listenport ); changedLocator = *localDescriptor.locator != *ipv4locator; localDescriptor.locator = ipv4locator; logging_info( "binding to addr = " << ipv4locator->toString() ); foundLocator = true; break; } } // for( ; i != iend; i++) // // if we found no locator, bind to localhost // if( !foundLocator ){ changedLocator = *localDescriptor.locator != IPv4Locator::LOCALHOST; localDescriptor.locator = new IPv4Locator( IPv4Locator::LOCALHOST ); ((IPv4Locator*)(localDescriptor.locator))->setPort( listenport ); logging_info( "found no good local lcoator, binding to addr = " << localDescriptor.locator->toString() ); } // // if we have connections that have no more longer endpoints // close these. they will be automatically built up again. // also update the local locator in the linkset mapping // if( changedLocator ){ logging_debug( "local endp locator has changed to " << localDescriptor.toString() << ", resettings connections that end at old locator " << oldLocalDescriptor.toString()); LinkSet::iterator i = linkSet.begin(); LinkSet::iterator iend = linkSet.end(); for( ; i != iend; i++ ){ logging_debug( "checking connection for locator change: " << " local " << (*i).localLocator->toString() << " old " << oldLocalDescriptor.locator->toString() ); if( *((*i).localLocator) == *(oldLocalDescriptor.locator) ){ logging_debug("terminating connection to " << (*i).remoteLocator->toString() ); transport->terminate( oldLocalDescriptor.locator, (*i).remoteLocator ); (*i).localLocator = localDescriptor.locator; } } // for( ; i != iend; i++ ) // wait 500ms to give the sockets time to shut down usleep( 500000 ); } else { logging_debug( "locator has not changed, not resetting connections" ); } // // handle the connections that have no longer any // valid locator. send update messages with the new // locator, so the remote node updates its locator/link mapping // LinkSet::iterator iAffected = linkSet.begin(); LinkSet::iterator endAffected = linkSet.end(); for( ; iAffected != endAffected; iAffected++ ){ LinkDescriptor descr = *iAffected; logging_debug( "sending out link locator update to " << descr.remoteLocator->toString() ); AribaBaseMsg updateMsg( descr.remoteLocator, AribaBaseMsg::LINK_STATE_UPDATE, descr.localLink, descr.remoteLink ); transport->sendMessage( &updateMsg ); } } }} // namespace ariba, communication