An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/communication/BaseCommunication.cpp @ 10653

Last change on this file since 10653 was 10653, checked in by Michael Tänzer, 9 years ago

Merge the ASIO branch back into trunk

File size: 23.7 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseCommunication.h"
40
41#include "networkinfo/AddressDiscovery.h"
42#include "ariba/utility/types/PeerID.h"
43#include <boost/function.hpp>
44
45#ifdef UNDERLAY_OMNET
46  #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
47  #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
48  #include "ariba/utility/system/StartupWrapper.h"
49
50  using ariba::communication::AribaOmnetModule;
51  using ariba::communication::OmnetNetworkProtocol;
52  using ariba::utility::StartupWrapper;
53#endif
54
55namespace ariba {
56namespace communication {
57
58using ariba::utility::PeerID;
59
60use_logging_cpp(BaseCommunication);
61
62/// adds an endpoint to the list
63void BaseCommunication::add_endpoint( const address_v* endpoint ) {
64        if (endpoint==NULL) return;
65        BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {
66                if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {
67                        ref.count++;
68                        return;
69                }
70        }
71        endpoint_reference ref;
72        ref.endpoint = endpoint->clone();
73        ref.count = 1;
74        remote_endpoints.push_back(ref);
75}
76
77/// removes an endpoint from the list
78void BaseCommunication::remove_endpoint( const address_v* endpoint ) {
79        if (endpoint==NULL) return;
80        for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();
81                i != remote_endpoints.end(); i++) {
82                if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {
83                        i->count--;
84                        if (i->count==0) {
85                                logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");
86                                transport->terminate(i->endpoint);
87                                delete i->endpoint;
88                                remote_endpoints.erase(i);
89                        }
90                        return;
91                }
92        }
93}
94
95
96BaseCommunication::BaseCommunication() 
97        : currentSeqnum( 0 ),
98          transport( NULL ),
99          messageReceiver( NULL ),
100          started( false )
101{
102}
103
104
105BaseCommunication::~BaseCommunication(){
106}
107
108
109void BaseCommunication::start() {
110        logging_info( "Starting up ..." );
111        currentSeqnum = 0;
112
113        // set local peer id
114        localDescriptor.getPeerId() = PeerID::random();
115        logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
116
117        // creating transports
118        logging_info( "Creating transports ..." );
119
120#ifdef UNDERLAY_OMNET
121        AribaOmnetModule* module = StartupWrapper::getCurrentModule();
122        module->setServerPort( listenport );
123
124        transport = module;
125        network = new OmnetNetworkProtocol( module );
126#else
127        transport = new transport_peer( localDescriptor.getEndpoints() );
128#endif
129
130        logging_info( "Searching for local locators ..." );
131        /**
132         * DONT DO THAT: if(localDescriptor.getEndpoints().to_string().length() == 0)
133         * since addresses are used to initialize transport addresses
134         */
135        AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() );
136        logging_info( "Done. Local endpoints = " << localDescriptor.toString() );
137
138        transport->register_listener( this );
139        transport->start();
140
141#ifndef UNDERLAY_OMNET
142        // bind to the network change detection
143        networkMonitor.registerNotification( this );
144#endif
145
146        // base comm startup done
147        started = true;
148        logging_info( "Started up." );
149}
150
151void BaseCommunication::stop() {
152        logging_info( "Stopping transports ..." );
153
154        transport->stop();
155        delete transport;
156        started = false;
157
158        logging_info( "Stopped." );
159}
160
161bool BaseCommunication::isStarted(){
162        return started;
163}
164
165/// Sets the endpoints
166void BaseCommunication::setEndpoints( string& _endpoints ) {
167        localDescriptor.getEndpoints().assign(_endpoints);
168        logging_info("Setting local end-points: "
169                << localDescriptor.getEndpoints().to_string());
170}
171
172const LinkID BaseCommunication::establishLink(
173        const EndpointDescriptor& descriptor,
174        const LinkID& link_id,
175        const QoSParameterSet& qos,
176        const SecurityParameterSet& sec) {
177
178        // copy link id
179        LinkID linkid = link_id;
180
181        // debug
182        logging_debug( "Request to establish link" );
183
184        // create link identifier
185        if (linkid.isUnspecified())     linkid = LinkID::create();
186
187        // create link descriptor
188        logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() );
189        LinkDescriptor* ld = new LinkDescriptor();
190        ld->localLink = linkid;
191        addLink( ld );
192
193        // send a message to request new link to remote
194        logging_debug( "Send messages with request to open link to " << descriptor.toString() );
195        AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid );
196        baseMsg.getLocalDescriptor() = localDescriptor;
197        baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId();
198
199        // serialize and send message
200        send( &baseMsg, descriptor );
201
202        return linkid;
203}
204
205void BaseCommunication::dropLink(const LinkID link) {
206
207        logging_debug( "Starting to drop link " + link.toString() );
208
209        // see if we have the link
210        LinkDescriptor& ld = queryLocalLink( link );
211        if( ld.isUnspecified() ) {
212                logging_error( "Don't know the link you want to drop "+ link.toString() );
213                return;
214        }
215
216        // tell the registered listeners
217        BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
218                i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
219        }
220
221        // create message to drop the link
222        logging_debug( "Sending out link close request. for us, the link is closed now" );
223        AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink );
224
225        // send message to drop the link
226        send( &msg, ld );
227
228        // remove from map
229        removeLink(link);
230}
231
232seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
233
234        logging_debug( "Sending out message to link " << lid.toString() );
235
236        // query local link info
237        LinkDescriptor& ld = queryLocalLink(lid);
238        if( ld.isUnspecified() ){
239                logging_error( "Don't know the link with id " << lid.toString() );
240                return -1;
241        }
242
243        // link not up-> error
244        if( !ld.up ) {
245                logging_error("Can not send on link " << lid.toString() << ": link not up");
246                return -1;
247        }
248
249        // create message
250        AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink );
251
252        // encapsulate the payload message
253        msg.encapsulate( const_cast<Message*>(message) );
254
255        // send message
256        send( &msg, ld );
257
258        // return sequence number
259        return ++currentSeqnum;
260}
261
262const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
263        if( link.isUnspecified() ){
264                return localDescriptor;
265        } else {
266                LinkDescriptor& linkDesc = queryLocalLink(link);
267                if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
268                return linkDesc.remoteEndpoint;
269        }
270}
271
272void BaseCommunication::registerEventListener(CommunicationEvents* _events){
273        if( eventListener.find( _events ) == eventListener.end() )
274                eventListener.insert( _events );
275}
276
277void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
278        EventListenerSet::iterator i = eventListener.find( _events );
279        if( i != eventListener.end() )
280                eventListener.erase( i );
281}
282
283SystemEventType TransportEvent("Transport");
284SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
285
286/// called when a system event is emitted by system queue
287void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
288
289        // dispatch received messages
290        if ( event.getType() == MessageDispatchEvent ){
291                logging_debug( "Forwarding message receiver" );
292                boost::function0<void>* handler = event.getData< boost::function0<void> >();
293                (*handler)();
294                delete handler;
295        }
296}
297
298/**
299 * called within the ASIO thread
300 * when a message is received from underlay transport
301 */ 
302void BaseCommunication::receive_message(transport_connection::sptr connection,
303        reboost::message_t msg) {
304
305        logging_debug( "Dispatching message" );
306       
307    boost::function0<void>* handler = new boost::function0<void>(
308            boost::bind(
309                    &BaseCommunication::receiveMessage,
310                    this,
311                    connection,
312                    msg)
313    );
314   
315    SystemQueue::instance().scheduleEvent(
316        SystemEvent(this, MessageDispatchEvent, handler)
317    );
318}
319
320/**
321 * called within the ARIBA thread (System Queue)
322 * when a message is received from underlay transport
323 */ 
324void BaseCommunication::receiveMessage(transport_connection::sptr connection,
325        reboost::message_t message)
326{
327   
328    //// Adapt to old message system ////
329    // Copy data
330    size_t bytes_len = message.size();
331    uint8_t* bytes = new uint8_t[bytes_len];
332    message.read(bytes, 0, bytes_len);
333   
334    Data data(bytes, bytes_len * 8);
335   
336    Message legacy_message;
337    legacy_message.setPayload(data);
338   
339   
340   
341        /// decapsulate message
342        AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>();
343        logging_debug( "Receiving message of type " << msg->getTypeString() );
344
345        // handle message
346        switch (msg->getType()) {
347
348                // ---------------------------------------------------------------------
349                // data message
350                // ---------------------------------------------------------------------
351                case AribaBaseMsg::typeData: {
352                        logging_debug( "Received data message, forwarding to overlay" );
353                        if( messageReceiver != NULL ) {
354                                messageReceiver->receiveMessage(
355                                        msg, msg->getRemoteLink(), NodeID::UNSPECIFIED
356                                );
357                        }
358                        break;
359                }
360
361                // ---------------------------------------------------------------------
362                // handle link request from remote
363                // ---------------------------------------------------------------------
364                case AribaBaseMsg::typeLinkRequest: {
365                        logging_debug( "Received link open request" );
366
367                        /// not the correct peer id-> skip request
368                        if (!msg->getRemoteDescriptor().getPeerId().isUnspecified()
369                                && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) {
370                                logging_info("Received link request for "
371                                        << msg->getRemoteDescriptor().getPeerId().toString()
372                                        << "but i'm "
373                                        << localDescriptor.getPeerId()
374                                        << ": Ignoring!");
375                                break;
376                        }
377
378                        /// only answer the first request
379                        if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) {
380                                logging_debug("Link request already received. Ignore!");
381                                break;
382                        }
383
384                        /// create link ids
385                        LinkID localLink  = LinkID::create();
386                        LinkID remoteLink = msg->getLocalLink();
387                        logging_debug(
388                                "local=" << connection->getLocalEndpoint()->to_string()
389                                << " remote=" << connection->getRemoteEndpoint()->to_string()
390                        );
391
392                        // check if link creation is allowed by ALL listeners
393                        bool allowlink = true;
394                        BOOST_FOREACH( CommunicationEvents* i, eventListener ){
395                                allowlink &= i->onLinkRequest( localLink,
396                                        connection->getLocalEndpoint(),
397                                        connection->getRemoteEndpoint());
398                        }
399
400                        // not allowed-> warn
401                        if( !allowlink ){
402                                logging_warn( "Overlay denied creation of link" );
403                                delete msg;
404                                return;
405                        }
406
407                        // create descriptor
408                        LinkDescriptor* ld = new LinkDescriptor();
409                        ld->localLink = localLink;
410                        ld->remoteLink = remoteLink;
411                        ld->localLocator = connection->getLocalEndpoint()->clone();
412                        ld->remoteLocator = connection->getRemoteEndpoint()->clone();
413                        ld->connection = connection;
414                        ld->remoteEndpoint = msg->getLocalDescriptor();
415                        add_endpoint(ld->remoteLocator);
416
417                        // add layer 1-3 addresses
418                        ld->remoteEndpoint.getEndpoints().add(
419                                ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
420                        localDescriptor.getEndpoints().add(
421                                connection->getLocalEndpoint(),
422                                endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
423
424                        // link is now up-> add it
425                        ld->up = true;
426                        addLink(ld);
427
428                        // link is up!
429                        logging_debug( "Link (initiated from remote) is up with "
430                                << "local(id=" << ld->localLink.toString() << ","
431                                << "locator=" << ld->localLocator->to_string() << ") "
432                                << "remote(id=" << ld->remoteLink.toString() << ", "
433                                << "locator=" << ld->remoteLocator->to_string() << ")"
434                        );
435
436                        // sending link request reply
437                        logging_debug( "Sending link request reply with ids "
438                                << "local=" << localLink.toString() << ", "
439                                << "remote=" << remoteLink.toString() );
440                        AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink );
441                        reply.getLocalDescriptor() = localDescriptor;
442                        reply.getRemoteDescriptor() = ld->remoteEndpoint;
443
444                        send( &reply, *ld );
445
446                        // inform listeners about new open link
447                        BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
448                                i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
449                        }
450
451                        // done
452                        break;
453                }
454
455                // ---------------------------------------------------------------------
456                // handle link request reply
457                // ---------------------------------------------------------------------
458                case AribaBaseMsg::typeLinkReply: {
459                        logging_debug( "Received link open reply for a link we initiated" );
460
461                        // this is a reply to a link open request, so we have already
462                        // a link mapping and can now set the remote link to valid
463                        LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink() );
464
465                        // no link found-> warn!
466                        if (ld.isUnspecified()) {
467                                logging_warn("Failed to find local link " << msg->getRemoteLink().toString());
468                                delete msg;
469                                return;
470                        }
471
472                        // store the connection
473                        ld.connection = connection;
474                       
475                        // set remote locator and link id
476                        ld.remoteLink = msg->getLocalLink();
477                        ld.remoteLocator = connection->getRemoteEndpoint()->clone();
478                        ld.remoteEndpoint.getEndpoints().add(
479                                                        msg->getLocalDescriptor().getEndpoints(),
480                                                        endpoint_set::Layer1_4
481                                                );
482
483                        localDescriptor.getEndpoints().add(
484                                msg->getRemoteDescriptor().getEndpoints(),
485                                endpoint_set::Layer1_3
486                        );
487                        ld.up = true;
488                        add_endpoint(ld.remoteLocator);
489
490                        logging_debug( "Link is now up with local id "
491                                << ld.localLink.toString() << " and remote id "
492                                << ld.remoteLink.toString() );
493
494
495                        // inform lisneters about link up event
496                        BOOST_FOREACH( CommunicationEvents* i, eventListener ){
497                                i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
498                        }
499
500                        // done
501                        break;
502                }
503
504                // ---------------------------------------------------------------------
505                // handle link close requests
506                // ---------------------------------------------------------------------
507                case AribaBaseMsg::typeLinkClose: {
508                        // get remote link
509                        const LinkID& localLink = msg->getRemoteLink();
510                        logging_debug( "Received link close request for link " << localLink.toString() );
511
512                        // searching for link, not found-> warn
513                        LinkDescriptor& linkDesc = queryLocalLink( localLink );
514                        if (linkDesc.isUnspecified()) {
515                                logging_warn("Failed to find local link " << localLink.toString());
516                                delete msg;
517                                return;
518                        }
519
520                        // inform listeners
521                        BOOST_FOREACH( CommunicationEvents* i, eventListener ){
522                                i->onLinkDown( linkDesc.localLink,
523                                                linkDesc.localLocator, linkDesc.remoteLocator );
524                        }
525
526                        // remove the link descriptor
527                        removeLink( localLink );
528
529                        // done
530                        break;
531                }
532
533                // ---------------------------------------------------------------------
534                // handle link locator changes
535                // ---------------------------------------------------------------------
536                case AribaBaseMsg::typeLinkUpdate: {
537                        const LinkID& localLink = msg->getRemoteLink();
538                        logging_debug( "Received link update for link "
539                                << localLink.toString() );
540
541                        // find the link description
542                        LinkDescriptor& linkDesc = queryLocalLink( localLink );
543                        if (linkDesc.isUnspecified()) {
544                                logging_warn("Failed to update local link "
545                                        << localLink.toString());
546                                delete msg;
547                                return;
548                        }
549
550                        // update the remote locator
551                        const address_v* oldremote = linkDesc.remoteLocator;
552                        linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone();
553
554                        // inform the listeners (local link has _not_ changed!)
555                        BOOST_FOREACH( CommunicationEvents* i, eventListener ){
556                                i->onLinkChanged(
557                                        linkDesc.localLink,     // linkid
558                                        linkDesc.localLocator,  // old local
559                                        linkDesc.localLocator,  // new local
560                                        oldremote,              // old remote
561                                        linkDesc.remoteLocator  // new remote
562                                );
563                        }
564
565                        // done
566                        break;
567                }
568        }
569
570        delete msg;
571}
572
573/// add a newly allocated link to the set of links
574void BaseCommunication::addLink( LinkDescriptor* link ) {
575        linkSet.push_back( link );
576}
577
578/// remove a link from set
579void BaseCommunication::removeLink( const LinkID& localLink ) {
580        for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
581                if( (*i)->localLink != localLink) continue;
582                remove_endpoint((*i)->remoteLocator);
583                delete *i;
584                linkSet.erase( i );
585                break;
586        }
587}
588
589/// query a descriptor by local link id
590BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
591        for (size_t i=0; i<linkSet.size();i++)
592                if (linkSet[i]->localLink == link) return (LinkDescriptor&)*linkSet[i];
593
594        return LinkDescriptor::UNSPECIFIED();
595}
596
597/// query a descriptor by remote link id
598BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
599        for (size_t i=0; i<linkSet.size();i++)
600                if (linkSet[i]->remoteLink == link) return (LinkDescriptor&)*linkSet[i];
601
602        return LinkDescriptor::UNSPECIFIED();
603}
604
605LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
606        LinkIDs ids;
607        for (size_t i=0; i<linkSet.size(); i++){
608                if( addr == NULL ){
609                        ids.push_back( linkSet[i]->localLink );
610                } else {
611                        if ( *linkSet[i]->remoteLocator == *addr )
612                                ids.push_back( linkSet[i]->localLink );
613                }
614        }
615        return ids;
616}
617
618void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
619
620#ifdef UNDERLAY_OMNET
621
622        // we have no mobility support for simulations
623        return
624
625#endif // UNDERLAY_OMNET
626
627/*- disabled!
628
629        // we only care about address changes, not about interface changes
630        // as address changes are triggered by interface changes, we are safe here
631        if( info.type != NetworkChangeInterface::EventTypeAddressNew &&
632                info.type != NetworkChangeInterface::EventTypeAddressDelete ) return;
633
634        logging_info( "base communication is handling network address changes" );
635
636        // get all now available addresses
637        NetworkInformation networkInformation;
638        AddressInformation addressInformation;
639
640        NetworkInterfaceList interfaces = networkInformation.getInterfaces();
641        AddressList addresses;
642
643        for( NetworkInterfaceList::iterator i = interfaces.begin(); i != interfaces.end(); i++ ){
644                AddressList newaddr = addressInformation.getAddresses(*i);
645                addresses.insert( addresses.end(), newaddr.begin(), newaddr.end() );
646        }
647
648        //
649        // get current locators for the local endpoint
650        // TODO: this code is dublicate of the ctor code!!! cleanup!
651        //
652
653        NetworkProtocol::NetworkLocatorSet locators = network->getAddresses();
654        NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin();
655        NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end();
656
657        //
658        // remember the old local endpoint, in case it changes
659        //
660
661        EndpointDescriptor oldLocalDescriptor( localDescriptor );
662
663        //
664        // look for local locators that we can use in communication
665        //
666        // choose the first locator that is not localhost
667        //
668
669        bool foundLocator = false;
670        bool changedLocator = false;
671
672        for( ; i != iend; i++){
673                logging_debug( "local locator found " << (*i)->toString() );
674                IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
675
676                if( *ipv4locator != IPv4Locator::LOCALHOST &&
677                    *ipv4locator != IPv4Locator::ANY       &&
678                    *ipv4locator != IPv4Locator::BROADCAST  ){
679
680                        ipv4locator->setPort( listenport );
681                        changedLocator = *localDescriptor.locator != *ipv4locator;
682                        localDescriptor.locator = ipv4locator;
683                        logging_info( "binding to addr = " << ipv4locator->toString() );
684                        foundLocator = true;
685                        break;
686                }
687        } // for( ; i != iend; i++)
688
689        //
690        // if we found no locator, bind to localhost
691        //
692
693        if( !foundLocator ){
694                changedLocator = *localDescriptor.locator != IPv4Locator::LOCALHOST;
695                localDescriptor.locator = new IPv4Locator( IPv4Locator::LOCALHOST );
696                ((IPv4Locator*)(localDescriptor.locator))->setPort( listenport );
697                logging_info( "found no good local lcoator, binding to addr = " <<
698                                                localDescriptor.locator->toString() );
699        }
700
701        //
702        // if we have connections that have no more longer endpoints
703        // close these. they will be automatically built up again.
704        // also update the local locator in the linkset mapping
705        //
706
707        if( changedLocator ){
708
709                logging_debug( "local endp locator has changed to " << localDescriptor.toString() <<
710                                ", resettings connections that end at old locator " <<
711                                        oldLocalDescriptor.toString());
712
713                LinkSet::iterator i = linkSet.begin();
714                LinkSet::iterator iend = linkSet.end();
715
716                for( ; i != iend; i++ ){
717
718                        logging_debug( "checking connection for locator change: " <<
719                                        " local " << (*i).localLocator->toString() <<
720                                        " old " << oldLocalDescriptor.locator->toString() );
721
722                        if( *((*i).localLocator) == *(oldLocalDescriptor.locator) ){
723
724                                logging_debug("terminating connection to " << (*i).remoteLocator->toString() );
725                                transport->terminate( oldLocalDescriptor.locator, (*i).remoteLocator );
726
727                                (*i).localLocator = localDescriptor.locator;
728                        }
729                } // for( ; i != iend; i++ )
730
731                // wait 500ms to give the sockets time to shut down
732                usleep( 500000 );
733
734        } else {
735
736                logging_debug( "locator has not changed, not resetting connections" );
737
738        }
739
740        //
741        // handle the connections that have no longer any
742        // valid locator. send update messages with the new
743        // locator,  so the remote node updates its locator/link mapping
744        //
745
746        LinkSet::iterator iAffected = linkSet.begin();
747        LinkSet::iterator endAffected = linkSet.end();
748
749        for( ; iAffected != endAffected; iAffected++ ){
750                LinkDescriptor descr = *iAffected;
751                logging_debug( "sending out link locator update to " << descr.remoteLocator->toString() );
752
753                AribaBaseMsg updateMsg(         descr.remoteLocator,
754                                                AribaBaseMsg::LINK_STATE_UPDATE,
755                                                descr.localLink, descr.remoteLink );
756
757                transport->sendMessage( &updateMsg );
758        }
759*/
760}
761
762/// sends a message to all end-points in the end-point descriptor
763void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) {
764        Data data = data_serialize(legacy_message, DEFAULT_V);
765       
766        //// Adapt to new message system ////
767        // transfer data buffer ownership to the shared_buffer
768    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
769       
770        reboost::message_t message;
771        message.push_back(buf);
772       
773        transport->send(endpoint.getEndpoints(), message);
774}
775
776/// sends a message to the remote locator inside the link descriptor
777void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) {
778        if (desc.remoteLocator==NULL) return;
779       
780        Data data = data_serialize(legacy_message, DEFAULT_V);
781   
782    //// Adapt to new message system ////
783    // transfer data buffer ownership to the shared_buffer
784    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
785   
786    reboost::message_t message;
787    message.push_back(buf);
788   
789        desc.connection->send(message);
790}
791
792}} // namespace ariba, communication
Note: See TracBrowser for help on using the repository browser.