source: source/ariba/communication/BaseCommunication.cpp@ 10767

Last change on this file since 10767 was 10767, checked in by Michael Tänzer, 12 years ago

Clean up NetworkChangeDetection

File size: 23.8 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseCommunication.h"
40
41#include "networkinfo/AddressDiscovery.h"
42#include "ariba/utility/types/PeerID.h"
43#include "ariba/utility/system/SystemQueue.h"
44#include <boost/function.hpp>
45
46#ifdef UNDERLAY_OMNET
47 #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
48 #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
49 #include "ariba/utility/system/StartupWrapper.h"
50
51 using ariba::communication::AribaOmnetModule;
52 using ariba::communication::OmnetNetworkProtocol;
53 using ariba::utility::StartupWrapper;
54#endif
55
56namespace ariba {
57namespace communication {
58
59using ariba::utility::PeerID;
60using ariba::utility::SystemQueue;
61
62use_logging_cpp(BaseCommunication);
63
64/// adds an endpoint to the list
65void BaseCommunication::add_endpoint( const address_v* endpoint ) {
66 if (endpoint==NULL) return;
67 BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {
68 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {
69 ref.count++;
70 return;
71 }
72 }
73 endpoint_reference ref;
74 ref.endpoint = endpoint->clone();
75 ref.count = 1;
76 remote_endpoints.push_back(ref);
77}
78
79/// removes an endpoint from the list
80void BaseCommunication::remove_endpoint( const address_v* endpoint ) {
81 if (endpoint==NULL) return;
82 for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();
83 i != remote_endpoints.end(); i++) {
84 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {
85 i->count--;
86 if (i->count==0) {
87 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");
88 transport->terminate(i->endpoint);
89 delete i->endpoint;
90 remote_endpoints.erase(i);
91 }
92 return;
93 }
94 }
95}
96
97
98BaseCommunication::BaseCommunication()
99 : currentSeqnum( 0 ),
100 transport( NULL ),
101 messageReceiver( NULL ),
102 started( false )
103{
104}
105
106
107BaseCommunication::~BaseCommunication(){
108}
109
110
111void BaseCommunication::start() {
112 logging_info( "Starting up ..." );
113 currentSeqnum = 0;
114
115 // set local peer id
116 localDescriptor.getPeerId() = PeerID::random();
117 logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
118
119 // creating transports
120 logging_info( "Creating transports ..." );
121
122#ifdef UNDERLAY_OMNET
123 AribaOmnetModule* module = StartupWrapper::getCurrentModule();
124 module->setServerPort( listenport );
125
126 transport = module;
127 network = new OmnetNetworkProtocol( module );
128#else
129 transport = new transport_peer( localDescriptor.getEndpoints() );
130#endif
131
132 logging_info( "Searching for local locators ..." );
133 /**
134 * DONT DO THAT: if(localDescriptor.getEndpoints().to_string().length() == 0)
135 * since addresses are used to initialize transport addresses
136 */
137 AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() );
138 logging_info( "Done. Local endpoints = " << localDescriptor.toString() );
139
140 transport->register_listener( this );
141 transport->start();
142
143#ifndef UNDERLAY_OMNET
144 // bind to the network change detection
145 networkMonitor.registerNotification( this );
146#endif
147
148 // base comm startup done
149 started = true;
150 logging_info( "Started up." );
151}
152
153void BaseCommunication::stop() {
154 logging_info( "Stopping transports ..." );
155
156 transport->stop();
157 delete transport;
158 started = false;
159
160 logging_info( "Stopped." );
161}
162
163bool BaseCommunication::isStarted(){
164 return started;
165}
166
167/// Sets the endpoints
168void BaseCommunication::setEndpoints( string& _endpoints ) {
169 localDescriptor.getEndpoints().assign(_endpoints);
170 logging_info("Setting local end-points: "
171 << localDescriptor.getEndpoints().to_string());
172}
173
174const LinkID BaseCommunication::establishLink(
175 const EndpointDescriptor& descriptor,
176 const LinkID& link_id,
177 const QoSParameterSet& qos,
178 const SecurityParameterSet& sec) {
179
180 // copy link id
181 LinkID linkid = link_id;
182
183 // debug
184 logging_debug( "Request to establish link" );
185
186 // create link identifier
187 if (linkid.isUnspecified()) linkid = LinkID::create();
188
189 // create link descriptor
190 logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() );
191 LinkDescriptor* ld = new LinkDescriptor();
192 ld->localLink = linkid;
193 addLink( ld );
194
195 // send a message to request new link to remote
196 logging_debug( "Send messages with request to open link to " << descriptor.toString() );
197 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid );
198 baseMsg.getLocalDescriptor() = localDescriptor;
199 baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId();
200
201 // serialize and send message
202 send( &baseMsg, descriptor );
203
204 return linkid;
205}
206
207void BaseCommunication::dropLink(const LinkID link) {
208
209 logging_debug( "Starting to drop link " + link.toString() );
210
211 // see if we have the link
212 LinkDescriptor& ld = queryLocalLink( link );
213 if( ld.isUnspecified() ) {
214 logging_error( "Don't know the link you want to drop "+ link.toString() );
215 return;
216 }
217
218 // tell the registered listeners
219 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
220 i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
221 }
222
223 // create message to drop the link
224 logging_debug( "Sending out link close request. for us, the link is closed now" );
225 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink );
226
227 // send message to drop the link
228 send( &msg, ld );
229
230 // remove from map
231 removeLink(link);
232}
233
234seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
235
236 logging_debug( "Sending out message to link " << lid.toString() );
237
238 // query local link info
239 LinkDescriptor& ld = queryLocalLink(lid);
240 if( ld.isUnspecified() ){
241 logging_error( "Don't know the link with id " << lid.toString() );
242 return -1;
243 }
244
245 // link not up-> error
246 if( !ld.up ) {
247 logging_error("Can not send on link " << lid.toString() << ": link not up");
248 return -1;
249 }
250
251 // create message
252 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink );
253
254 // encapsulate the payload message
255 msg.encapsulate( const_cast<Message*>(message) );
256
257 // send message
258 send( &msg, ld );
259
260 // return sequence number
261 return ++currentSeqnum;
262}
263
264const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
265 if( link.isUnspecified() ){
266 return localDescriptor;
267 } else {
268 LinkDescriptor& linkDesc = queryLocalLink(link);
269 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
270 return linkDesc.remoteEndpoint;
271 }
272}
273
274void BaseCommunication::registerEventListener(CommunicationEvents* _events){
275 if( eventListener.find( _events ) == eventListener.end() )
276 eventListener.insert( _events );
277}
278
279void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
280 EventListenerSet::iterator i = eventListener.find( _events );
281 if( i != eventListener.end() )
282 eventListener.erase( i );
283}
284
285SystemEventType TransportEvent("Transport");
286SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
287
288/// called when a system event is emitted by system queue
289void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
290
291 // dispatch received messages
292 if ( event.getType() == MessageDispatchEvent ){
293 logging_debug( "Forwarding message receiver" );
294 boost::function0<void>* handler = event.getData< boost::function0<void> >();
295 (*handler)();
296 delete handler;
297 }
298}
299
300/**
301 * called within the ASIO thread
302 * when a message is received from underlay transport
303 */
304void BaseCommunication::receive_message(transport_connection::sptr connection,
305 reboost::message_t msg) {
306
307 logging_debug( "Dispatching message" );
308
309 boost::function0<void>* handler = new boost::function0<void>(
310 boost::bind(
311 &BaseCommunication::receiveMessage,
312 this,
313 connection,
314 msg)
315 );
316
317 SystemQueue::instance().scheduleEvent(
318 SystemEvent(this, MessageDispatchEvent, handler)
319 );
320}
321
322/**
323 * called within the ARIBA thread (System Queue)
324 * when a message is received from underlay transport
325 */
326void BaseCommunication::receiveMessage(transport_connection::sptr connection,
327 reboost::message_t message)
328{
329
330 //// Adapt to old message system ////
331 // Copy data
332 size_t bytes_len = message.size();
333 uint8_t* bytes = new uint8_t[bytes_len];
334 message.read(bytes, 0, bytes_len);
335
336 Data data(bytes, bytes_len * 8);
337
338 Message legacy_message;
339 legacy_message.setPayload(data);
340
341
342
343 /// decapsulate message
344 AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>();
345 logging_debug( "Receiving message of type " << msg->getTypeString() );
346
347 // handle message
348 switch (msg->getType()) {
349
350 // ---------------------------------------------------------------------
351 // data message
352 // ---------------------------------------------------------------------
353 case AribaBaseMsg::typeData: {
354 logging_debug( "Received data message, forwarding to overlay" );
355 if( messageReceiver != NULL ) {
356 messageReceiver->receiveMessage(
357 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED
358 );
359 }
360 break;
361 }
362
363 // ---------------------------------------------------------------------
364 // handle link request from remote
365 // ---------------------------------------------------------------------
366 case AribaBaseMsg::typeLinkRequest: {
367 logging_debug( "Received link open request" );
368
369 /// not the correct peer id-> skip request
370 if (!msg->getRemoteDescriptor().getPeerId().isUnspecified()
371 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) {
372 logging_info("Received link request for "
373 << msg->getRemoteDescriptor().getPeerId().toString()
374 << "but i'm "
375 << localDescriptor.getPeerId()
376 << ": Ignoring!");
377 break;
378 }
379
380 /// only answer the first request
381 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) {
382 logging_debug("Link request already received. Ignore!");
383 break;
384 }
385
386 /// create link ids
387 LinkID localLink = LinkID::create();
388 LinkID remoteLink = msg->getLocalLink();
389 logging_debug(
390 "local=" << connection->getLocalEndpoint()->to_string()
391 << " remote=" << connection->getRemoteEndpoint()->to_string()
392 );
393
394 // check if link creation is allowed by ALL listeners
395 bool allowlink = true;
396 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
397 allowlink &= i->onLinkRequest( localLink,
398 connection->getLocalEndpoint(),
399 connection->getRemoteEndpoint());
400 }
401
402 // not allowed-> warn
403 if( !allowlink ){
404 logging_warn( "Overlay denied creation of link" );
405 delete msg;
406 return;
407 }
408
409 // create descriptor
410 LinkDescriptor* ld = new LinkDescriptor();
411 ld->localLink = localLink;
412 ld->remoteLink = remoteLink;
413 ld->localLocator = connection->getLocalEndpoint()->clone();
414 ld->remoteLocator = connection->getRemoteEndpoint()->clone();
415 ld->connection = connection;
416 ld->remoteEndpoint = msg->getLocalDescriptor();
417 add_endpoint(ld->remoteLocator);
418
419 // add layer 1-3 addresses
420 ld->remoteEndpoint.getEndpoints().add(
421 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
422 localDescriptor.getEndpoints().add(
423 connection->getLocalEndpoint(),
424 endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
425
426 // link is now up-> add it
427 ld->up = true;
428 addLink(ld);
429
430 // link is up!
431 logging_debug( "Link (initiated from remote) is up with "
432 << "local(id=" << ld->localLink.toString() << ","
433 << "locator=" << ld->localLocator->to_string() << ") "
434 << "remote(id=" << ld->remoteLink.toString() << ", "
435 << "locator=" << ld->remoteLocator->to_string() << ")"
436 );
437
438 // sending link request reply
439 logging_debug( "Sending link request reply with ids "
440 << "local=" << localLink.toString() << ", "
441 << "remote=" << remoteLink.toString() );
442 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink );
443 reply.getLocalDescriptor() = localDescriptor;
444 reply.getRemoteDescriptor() = ld->remoteEndpoint;
445
446 send( &reply, *ld );
447
448 // inform listeners about new open link
449 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
450 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
451 }
452
453 // done
454 break;
455 }
456
457 // ---------------------------------------------------------------------
458 // handle link request reply
459 // ---------------------------------------------------------------------
460 case AribaBaseMsg::typeLinkReply: {
461 logging_debug( "Received link open reply for a link we initiated" );
462
463 // this is a reply to a link open request, so we have already
464 // a link mapping and can now set the remote link to valid
465 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink() );
466
467 // no link found-> warn!
468 if (ld.isUnspecified()) {
469 logging_warn("Failed to find local link " << msg->getRemoteLink().toString());
470 delete msg;
471 return;
472 }
473
474 // store the connection
475 ld.connection = connection;
476
477 // set remote locator and link id
478 ld.remoteLink = msg->getLocalLink();
479 ld.remoteLocator = connection->getRemoteEndpoint()->clone();
480 ld.remoteEndpoint.getEndpoints().add(
481 msg->getLocalDescriptor().getEndpoints(),
482 endpoint_set::Layer1_4
483 );
484
485 localDescriptor.getEndpoints().add(
486 msg->getRemoteDescriptor().getEndpoints(),
487 endpoint_set::Layer1_3
488 );
489 ld.up = true;
490 add_endpoint(ld.remoteLocator);
491
492 logging_debug( "Link is now up with local id "
493 << ld.localLink.toString() << " and remote id "
494 << ld.remoteLink.toString() );
495
496
497 // inform lisneters about link up event
498 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
499 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
500 }
501
502 // done
503 break;
504 }
505
506 // ---------------------------------------------------------------------
507 // handle link close requests
508 // ---------------------------------------------------------------------
509 case AribaBaseMsg::typeLinkClose: {
510 // get remote link
511 const LinkID& localLink = msg->getRemoteLink();
512 logging_debug( "Received link close request for link " << localLink.toString() );
513
514 // searching for link, not found-> warn
515 LinkDescriptor& linkDesc = queryLocalLink( localLink );
516 if (linkDesc.isUnspecified()) {
517 logging_warn("Failed to find local link " << localLink.toString());
518 delete msg;
519 return;
520 }
521
522 // inform listeners
523 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
524 i->onLinkDown( linkDesc.localLink,
525 linkDesc.localLocator, linkDesc.remoteLocator );
526 }
527
528 // remove the link descriptor
529 removeLink( localLink );
530
531 // done
532 break;
533 }
534
535 // ---------------------------------------------------------------------
536 // handle link locator changes
537 // ---------------------------------------------------------------------
538 case AribaBaseMsg::typeLinkUpdate: {
539 const LinkID& localLink = msg->getRemoteLink();
540 logging_debug( "Received link update for link "
541 << localLink.toString() );
542
543 // find the link description
544 LinkDescriptor& linkDesc = queryLocalLink( localLink );
545 if (linkDesc.isUnspecified()) {
546 logging_warn("Failed to update local link "
547 << localLink.toString());
548 delete msg;
549 return;
550 }
551
552 // update the remote locator
553 const address_v* oldremote = linkDesc.remoteLocator;
554 linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone();
555
556 // inform the listeners (local link has _not_ changed!)
557 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
558 i->onLinkChanged(
559 linkDesc.localLink, // linkid
560 linkDesc.localLocator, // old local
561 linkDesc.localLocator, // new local
562 oldremote, // old remote
563 linkDesc.remoteLocator // new remote
564 );
565 }
566
567 // done
568 break;
569 }
570 }
571
572 delete msg;
573}
574
575/// add a newly allocated link to the set of links
576void BaseCommunication::addLink( LinkDescriptor* link ) {
577 linkSet.push_back( link );
578}
579
580/// remove a link from set
581void BaseCommunication::removeLink( const LinkID& localLink ) {
582 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
583 if( (*i)->localLink != localLink) continue;
584 remove_endpoint((*i)->remoteLocator);
585 delete *i;
586 linkSet.erase( i );
587 break;
588 }
589}
590
591/// query a descriptor by local link id
592BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
593 for (size_t i=0; i<linkSet.size();i++)
594 if (linkSet[i]->localLink == link) return (LinkDescriptor&)*linkSet[i];
595
596 return LinkDescriptor::UNSPECIFIED();
597}
598
599/// query a descriptor by remote link id
600BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
601 for (size_t i=0; i<linkSet.size();i++)
602 if (linkSet[i]->remoteLink == link) return (LinkDescriptor&)*linkSet[i];
603
604 return LinkDescriptor::UNSPECIFIED();
605}
606
607LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
608 LinkIDs ids;
609 for (size_t i=0; i<linkSet.size(); i++){
610 if( addr == NULL ){
611 ids.push_back( linkSet[i]->localLink );
612 } else {
613 if ( *linkSet[i]->remoteLocator == *addr )
614 ids.push_back( linkSet[i]->localLink );
615 }
616 }
617 return ids;
618}
619
620void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
621
622#ifdef UNDERLAY_OMNET
623
624 // we have no mobility support for simulations
625 return
626
627#endif // UNDERLAY_OMNET
628
629/*- disabled!
630
631 // we only care about address changes, not about interface changes
632 // as address changes are triggered by interface changes, we are safe here
633 if( info.type != NetworkChangeInterface::EventTypeAddressNew &&
634 info.type != NetworkChangeInterface::EventTypeAddressDelete ) return;
635
636 logging_info( "base communication is handling network address changes" );
637
638 // get all now available addresses
639 NetworkInformation networkInformation;
640 AddressInformation addressInformation;
641
642 NetworkInterfaceList interfaces = networkInformation.getInterfaces();
643 AddressList addresses;
644
645 for( NetworkInterfaceList::iterator i = interfaces.begin(); i != interfaces.end(); i++ ){
646 AddressList newaddr = addressInformation.getAddresses(*i);
647 addresses.insert( addresses.end(), newaddr.begin(), newaddr.end() );
648 }
649
650 //
651 // get current locators for the local endpoint
652 // TODO: this code is dublicate of the ctor code!!! cleanup!
653 //
654
655 NetworkProtocol::NetworkLocatorSet locators = network->getAddresses();
656 NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin();
657 NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end();
658
659 //
660 // remember the old local endpoint, in case it changes
661 //
662
663 EndpointDescriptor oldLocalDescriptor( localDescriptor );
664
665 //
666 // look for local locators that we can use in communication
667 //
668 // choose the first locator that is not localhost
669 //
670
671 bool foundLocator = false;
672 bool changedLocator = false;
673
674 for( ; i != iend; i++){
675 logging_debug( "local locator found " << (*i)->toString() );
676 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
677
678 if( *ipv4locator != IPv4Locator::LOCALHOST &&
679 *ipv4locator != IPv4Locator::ANY &&
680 *ipv4locator != IPv4Locator::BROADCAST ){
681
682 ipv4locator->setPort( listenport );
683 changedLocator = *localDescriptor.locator != *ipv4locator;
684 localDescriptor.locator = ipv4locator;
685 logging_info( "binding to addr = " << ipv4locator->toString() );
686 foundLocator = true;
687 break;
688 }
689 } // for( ; i != iend; i++)
690
691 //
692 // if we found no locator, bind to localhost
693 //
694
695 if( !foundLocator ){
696 changedLocator = *localDescriptor.locator != IPv4Locator::LOCALHOST;
697 localDescriptor.locator = new IPv4Locator( IPv4Locator::LOCALHOST );
698 ((IPv4Locator*)(localDescriptor.locator))->setPort( listenport );
699 logging_info( "found no good local lcoator, binding to addr = " <<
700 localDescriptor.locator->toString() );
701 }
702
703 //
704 // if we have connections that have no more longer endpoints
705 // close these. they will be automatically built up again.
706 // also update the local locator in the linkset mapping
707 //
708
709 if( changedLocator ){
710
711 logging_debug( "local endp locator has changed to " << localDescriptor.toString() <<
712 ", resettings connections that end at old locator " <<
713 oldLocalDescriptor.toString());
714
715 LinkSet::iterator i = linkSet.begin();
716 LinkSet::iterator iend = linkSet.end();
717
718 for( ; i != iend; i++ ){
719
720 logging_debug( "checking connection for locator change: " <<
721 " local " << (*i).localLocator->toString() <<
722 " old " << oldLocalDescriptor.locator->toString() );
723
724 if( *((*i).localLocator) == *(oldLocalDescriptor.locator) ){
725
726 logging_debug("terminating connection to " << (*i).remoteLocator->toString() );
727 transport->terminate( oldLocalDescriptor.locator, (*i).remoteLocator );
728
729 (*i).localLocator = localDescriptor.locator;
730 }
731 } // for( ; i != iend; i++ )
732
733 // wait 500ms to give the sockets time to shut down
734 usleep( 500000 );
735
736 } else {
737
738 logging_debug( "locator has not changed, not resetting connections" );
739
740 }
741
742 //
743 // handle the connections that have no longer any
744 // valid locator. send update messages with the new
745 // locator, so the remote node updates its locator/link mapping
746 //
747
748 LinkSet::iterator iAffected = linkSet.begin();
749 LinkSet::iterator endAffected = linkSet.end();
750
751 for( ; iAffected != endAffected; iAffected++ ){
752 LinkDescriptor descr = *iAffected;
753 logging_debug( "sending out link locator update to " << descr.remoteLocator->toString() );
754
755 AribaBaseMsg updateMsg( descr.remoteLocator,
756 AribaBaseMsg::LINK_STATE_UPDATE,
757 descr.localLink, descr.remoteLink );
758
759 transport->sendMessage( &updateMsg );
760 }
761*/
762}
763
764/// sends a message to all end-points in the end-point descriptor
765void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) {
766 Data data = data_serialize(legacy_message, DEFAULT_V);
767
768 //// Adapt to new message system ////
769 // transfer data buffer ownership to the shared_buffer
770 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
771
772 reboost::message_t message;
773 message.push_back(buf);
774
775 transport->send(endpoint.getEndpoints(), message);
776}
777
778/// sends a message to the remote locator inside the link descriptor
779void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) {
780 if (desc.remoteLocator==NULL) return;
781
782 Data data = data_serialize(legacy_message, DEFAULT_V);
783
784 //// Adapt to new message system ////
785 // transfer data buffer ownership to the shared_buffer
786 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
787
788 reboost::message_t message;
789 message.push_back(buf);
790
791 desc.connection->send(message);
792}
793
794}} // namespace ariba, communication
Note: See TracBrowser for help on using the repository browser.