source: source/ariba/communication/BaseCommunication.cpp@ 10688

Last change on this file since 10688 was 10653, checked in by Michael Tänzer, 12 years ago

Merge the ASIO branch back into trunk

File size: 23.7 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseCommunication.h"
40
41#include "networkinfo/AddressDiscovery.h"
42#include "ariba/utility/types/PeerID.h"
43#include <boost/function.hpp>
44
45#ifdef UNDERLAY_OMNET
46 #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
47 #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
48 #include "ariba/utility/system/StartupWrapper.h"
49
50 using ariba::communication::AribaOmnetModule;
51 using ariba::communication::OmnetNetworkProtocol;
52 using ariba::utility::StartupWrapper;
53#endif
54
55namespace ariba {
56namespace communication {
57
58using ariba::utility::PeerID;
59
60use_logging_cpp(BaseCommunication);
61
62/// adds an endpoint to the list
63void BaseCommunication::add_endpoint( const address_v* endpoint ) {
64 if (endpoint==NULL) return;
65 BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {
66 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {
67 ref.count++;
68 return;
69 }
70 }
71 endpoint_reference ref;
72 ref.endpoint = endpoint->clone();
73 ref.count = 1;
74 remote_endpoints.push_back(ref);
75}
76
77/// removes an endpoint from the list
78void BaseCommunication::remove_endpoint( const address_v* endpoint ) {
79 if (endpoint==NULL) return;
80 for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();
81 i != remote_endpoints.end(); i++) {
82 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {
83 i->count--;
84 if (i->count==0) {
85 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");
86 transport->terminate(i->endpoint);
87 delete i->endpoint;
88 remote_endpoints.erase(i);
89 }
90 return;
91 }
92 }
93}
94
95
96BaseCommunication::BaseCommunication()
97 : currentSeqnum( 0 ),
98 transport( NULL ),
99 messageReceiver( NULL ),
100 started( false )
101{
102}
103
104
105BaseCommunication::~BaseCommunication(){
106}
107
108
109void BaseCommunication::start() {
110 logging_info( "Starting up ..." );
111 currentSeqnum = 0;
112
113 // set local peer id
114 localDescriptor.getPeerId() = PeerID::random();
115 logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
116
117 // creating transports
118 logging_info( "Creating transports ..." );
119
120#ifdef UNDERLAY_OMNET
121 AribaOmnetModule* module = StartupWrapper::getCurrentModule();
122 module->setServerPort( listenport );
123
124 transport = module;
125 network = new OmnetNetworkProtocol( module );
126#else
127 transport = new transport_peer( localDescriptor.getEndpoints() );
128#endif
129
130 logging_info( "Searching for local locators ..." );
131 /**
132 * DONT DO THAT: if(localDescriptor.getEndpoints().to_string().length() == 0)
133 * since addresses are used to initialize transport addresses
134 */
135 AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() );
136 logging_info( "Done. Local endpoints = " << localDescriptor.toString() );
137
138 transport->register_listener( this );
139 transport->start();
140
141#ifndef UNDERLAY_OMNET
142 // bind to the network change detection
143 networkMonitor.registerNotification( this );
144#endif
145
146 // base comm startup done
147 started = true;
148 logging_info( "Started up." );
149}
150
151void BaseCommunication::stop() {
152 logging_info( "Stopping transports ..." );
153
154 transport->stop();
155 delete transport;
156 started = false;
157
158 logging_info( "Stopped." );
159}
160
161bool BaseCommunication::isStarted(){
162 return started;
163}
164
165/// Sets the endpoints
166void BaseCommunication::setEndpoints( string& _endpoints ) {
167 localDescriptor.getEndpoints().assign(_endpoints);
168 logging_info("Setting local end-points: "
169 << localDescriptor.getEndpoints().to_string());
170}
171
172const LinkID BaseCommunication::establishLink(
173 const EndpointDescriptor& descriptor,
174 const LinkID& link_id,
175 const QoSParameterSet& qos,
176 const SecurityParameterSet& sec) {
177
178 // copy link id
179 LinkID linkid = link_id;
180
181 // debug
182 logging_debug( "Request to establish link" );
183
184 // create link identifier
185 if (linkid.isUnspecified()) linkid = LinkID::create();
186
187 // create link descriptor
188 logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() );
189 LinkDescriptor* ld = new LinkDescriptor();
190 ld->localLink = linkid;
191 addLink( ld );
192
193 // send a message to request new link to remote
194 logging_debug( "Send messages with request to open link to " << descriptor.toString() );
195 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid );
196 baseMsg.getLocalDescriptor() = localDescriptor;
197 baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId();
198
199 // serialize and send message
200 send( &baseMsg, descriptor );
201
202 return linkid;
203}
204
205void BaseCommunication::dropLink(const LinkID link) {
206
207 logging_debug( "Starting to drop link " + link.toString() );
208
209 // see if we have the link
210 LinkDescriptor& ld = queryLocalLink( link );
211 if( ld.isUnspecified() ) {
212 logging_error( "Don't know the link you want to drop "+ link.toString() );
213 return;
214 }
215
216 // tell the registered listeners
217 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
218 i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
219 }
220
221 // create message to drop the link
222 logging_debug( "Sending out link close request. for us, the link is closed now" );
223 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink );
224
225 // send message to drop the link
226 send( &msg, ld );
227
228 // remove from map
229 removeLink(link);
230}
231
232seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
233
234 logging_debug( "Sending out message to link " << lid.toString() );
235
236 // query local link info
237 LinkDescriptor& ld = queryLocalLink(lid);
238 if( ld.isUnspecified() ){
239 logging_error( "Don't know the link with id " << lid.toString() );
240 return -1;
241 }
242
243 // link not up-> error
244 if( !ld.up ) {
245 logging_error("Can not send on link " << lid.toString() << ": link not up");
246 return -1;
247 }
248
249 // create message
250 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink );
251
252 // encapsulate the payload message
253 msg.encapsulate( const_cast<Message*>(message) );
254
255 // send message
256 send( &msg, ld );
257
258 // return sequence number
259 return ++currentSeqnum;
260}
261
262const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
263 if( link.isUnspecified() ){
264 return localDescriptor;
265 } else {
266 LinkDescriptor& linkDesc = queryLocalLink(link);
267 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
268 return linkDesc.remoteEndpoint;
269 }
270}
271
272void BaseCommunication::registerEventListener(CommunicationEvents* _events){
273 if( eventListener.find( _events ) == eventListener.end() )
274 eventListener.insert( _events );
275}
276
277void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
278 EventListenerSet::iterator i = eventListener.find( _events );
279 if( i != eventListener.end() )
280 eventListener.erase( i );
281}
282
283SystemEventType TransportEvent("Transport");
284SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
285
286/// called when a system event is emitted by system queue
287void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
288
289 // dispatch received messages
290 if ( event.getType() == MessageDispatchEvent ){
291 logging_debug( "Forwarding message receiver" );
292 boost::function0<void>* handler = event.getData< boost::function0<void> >();
293 (*handler)();
294 delete handler;
295 }
296}
297
298/**
299 * called within the ASIO thread
300 * when a message is received from underlay transport
301 */
302void BaseCommunication::receive_message(transport_connection::sptr connection,
303 reboost::message_t msg) {
304
305 logging_debug( "Dispatching message" );
306
307 boost::function0<void>* handler = new boost::function0<void>(
308 boost::bind(
309 &BaseCommunication::receiveMessage,
310 this,
311 connection,
312 msg)
313 );
314
315 SystemQueue::instance().scheduleEvent(
316 SystemEvent(this, MessageDispatchEvent, handler)
317 );
318}
319
320/**
321 * called within the ARIBA thread (System Queue)
322 * when a message is received from underlay transport
323 */
324void BaseCommunication::receiveMessage(transport_connection::sptr connection,
325 reboost::message_t message)
326{
327
328 //// Adapt to old message system ////
329 // Copy data
330 size_t bytes_len = message.size();
331 uint8_t* bytes = new uint8_t[bytes_len];
332 message.read(bytes, 0, bytes_len);
333
334 Data data(bytes, bytes_len * 8);
335
336 Message legacy_message;
337 legacy_message.setPayload(data);
338
339
340
341 /// decapsulate message
342 AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>();
343 logging_debug( "Receiving message of type " << msg->getTypeString() );
344
345 // handle message
346 switch (msg->getType()) {
347
348 // ---------------------------------------------------------------------
349 // data message
350 // ---------------------------------------------------------------------
351 case AribaBaseMsg::typeData: {
352 logging_debug( "Received data message, forwarding to overlay" );
353 if( messageReceiver != NULL ) {
354 messageReceiver->receiveMessage(
355 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED
356 );
357 }
358 break;
359 }
360
361 // ---------------------------------------------------------------------
362 // handle link request from remote
363 // ---------------------------------------------------------------------
364 case AribaBaseMsg::typeLinkRequest: {
365 logging_debug( "Received link open request" );
366
367 /// not the correct peer id-> skip request
368 if (!msg->getRemoteDescriptor().getPeerId().isUnspecified()
369 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) {
370 logging_info("Received link request for "
371 << msg->getRemoteDescriptor().getPeerId().toString()
372 << "but i'm "
373 << localDescriptor.getPeerId()
374 << ": Ignoring!");
375 break;
376 }
377
378 /// only answer the first request
379 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) {
380 logging_debug("Link request already received. Ignore!");
381 break;
382 }
383
384 /// create link ids
385 LinkID localLink = LinkID::create();
386 LinkID remoteLink = msg->getLocalLink();
387 logging_debug(
388 "local=" << connection->getLocalEndpoint()->to_string()
389 << " remote=" << connection->getRemoteEndpoint()->to_string()
390 );
391
392 // check if link creation is allowed by ALL listeners
393 bool allowlink = true;
394 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
395 allowlink &= i->onLinkRequest( localLink,
396 connection->getLocalEndpoint(),
397 connection->getRemoteEndpoint());
398 }
399
400 // not allowed-> warn
401 if( !allowlink ){
402 logging_warn( "Overlay denied creation of link" );
403 delete msg;
404 return;
405 }
406
407 // create descriptor
408 LinkDescriptor* ld = new LinkDescriptor();
409 ld->localLink = localLink;
410 ld->remoteLink = remoteLink;
411 ld->localLocator = connection->getLocalEndpoint()->clone();
412 ld->remoteLocator = connection->getRemoteEndpoint()->clone();
413 ld->connection = connection;
414 ld->remoteEndpoint = msg->getLocalDescriptor();
415 add_endpoint(ld->remoteLocator);
416
417 // add layer 1-3 addresses
418 ld->remoteEndpoint.getEndpoints().add(
419 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
420 localDescriptor.getEndpoints().add(
421 connection->getLocalEndpoint(),
422 endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
423
424 // link is now up-> add it
425 ld->up = true;
426 addLink(ld);
427
428 // link is up!
429 logging_debug( "Link (initiated from remote) is up with "
430 << "local(id=" << ld->localLink.toString() << ","
431 << "locator=" << ld->localLocator->to_string() << ") "
432 << "remote(id=" << ld->remoteLink.toString() << ", "
433 << "locator=" << ld->remoteLocator->to_string() << ")"
434 );
435
436 // sending link request reply
437 logging_debug( "Sending link request reply with ids "
438 << "local=" << localLink.toString() << ", "
439 << "remote=" << remoteLink.toString() );
440 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink );
441 reply.getLocalDescriptor() = localDescriptor;
442 reply.getRemoteDescriptor() = ld->remoteEndpoint;
443
444 send( &reply, *ld );
445
446 // inform listeners about new open link
447 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
448 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
449 }
450
451 // done
452 break;
453 }
454
455 // ---------------------------------------------------------------------
456 // handle link request reply
457 // ---------------------------------------------------------------------
458 case AribaBaseMsg::typeLinkReply: {
459 logging_debug( "Received link open reply for a link we initiated" );
460
461 // this is a reply to a link open request, so we have already
462 // a link mapping and can now set the remote link to valid
463 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink() );
464
465 // no link found-> warn!
466 if (ld.isUnspecified()) {
467 logging_warn("Failed to find local link " << msg->getRemoteLink().toString());
468 delete msg;
469 return;
470 }
471
472 // store the connection
473 ld.connection = connection;
474
475 // set remote locator and link id
476 ld.remoteLink = msg->getLocalLink();
477 ld.remoteLocator = connection->getRemoteEndpoint()->clone();
478 ld.remoteEndpoint.getEndpoints().add(
479 msg->getLocalDescriptor().getEndpoints(),
480 endpoint_set::Layer1_4
481 );
482
483 localDescriptor.getEndpoints().add(
484 msg->getRemoteDescriptor().getEndpoints(),
485 endpoint_set::Layer1_3
486 );
487 ld.up = true;
488 add_endpoint(ld.remoteLocator);
489
490 logging_debug( "Link is now up with local id "
491 << ld.localLink.toString() << " and remote id "
492 << ld.remoteLink.toString() );
493
494
495 // inform lisneters about link up event
496 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
497 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
498 }
499
500 // done
501 break;
502 }
503
504 // ---------------------------------------------------------------------
505 // handle link close requests
506 // ---------------------------------------------------------------------
507 case AribaBaseMsg::typeLinkClose: {
508 // get remote link
509 const LinkID& localLink = msg->getRemoteLink();
510 logging_debug( "Received link close request for link " << localLink.toString() );
511
512 // searching for link, not found-> warn
513 LinkDescriptor& linkDesc = queryLocalLink( localLink );
514 if (linkDesc.isUnspecified()) {
515 logging_warn("Failed to find local link " << localLink.toString());
516 delete msg;
517 return;
518 }
519
520 // inform listeners
521 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
522 i->onLinkDown( linkDesc.localLink,
523 linkDesc.localLocator, linkDesc.remoteLocator );
524 }
525
526 // remove the link descriptor
527 removeLink( localLink );
528
529 // done
530 break;
531 }
532
533 // ---------------------------------------------------------------------
534 // handle link locator changes
535 // ---------------------------------------------------------------------
536 case AribaBaseMsg::typeLinkUpdate: {
537 const LinkID& localLink = msg->getRemoteLink();
538 logging_debug( "Received link update for link "
539 << localLink.toString() );
540
541 // find the link description
542 LinkDescriptor& linkDesc = queryLocalLink( localLink );
543 if (linkDesc.isUnspecified()) {
544 logging_warn("Failed to update local link "
545 << localLink.toString());
546 delete msg;
547 return;
548 }
549
550 // update the remote locator
551 const address_v* oldremote = linkDesc.remoteLocator;
552 linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone();
553
554 // inform the listeners (local link has _not_ changed!)
555 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
556 i->onLinkChanged(
557 linkDesc.localLink, // linkid
558 linkDesc.localLocator, // old local
559 linkDesc.localLocator, // new local
560 oldremote, // old remote
561 linkDesc.remoteLocator // new remote
562 );
563 }
564
565 // done
566 break;
567 }
568 }
569
570 delete msg;
571}
572
573/// add a newly allocated link to the set of links
574void BaseCommunication::addLink( LinkDescriptor* link ) {
575 linkSet.push_back( link );
576}
577
578/// remove a link from set
579void BaseCommunication::removeLink( const LinkID& localLink ) {
580 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
581 if( (*i)->localLink != localLink) continue;
582 remove_endpoint((*i)->remoteLocator);
583 delete *i;
584 linkSet.erase( i );
585 break;
586 }
587}
588
589/// query a descriptor by local link id
590BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
591 for (size_t i=0; i<linkSet.size();i++)
592 if (linkSet[i]->localLink == link) return (LinkDescriptor&)*linkSet[i];
593
594 return LinkDescriptor::UNSPECIFIED();
595}
596
597/// query a descriptor by remote link id
598BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
599 for (size_t i=0; i<linkSet.size();i++)
600 if (linkSet[i]->remoteLink == link) return (LinkDescriptor&)*linkSet[i];
601
602 return LinkDescriptor::UNSPECIFIED();
603}
604
605LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
606 LinkIDs ids;
607 for (size_t i=0; i<linkSet.size(); i++){
608 if( addr == NULL ){
609 ids.push_back( linkSet[i]->localLink );
610 } else {
611 if ( *linkSet[i]->remoteLocator == *addr )
612 ids.push_back( linkSet[i]->localLink );
613 }
614 }
615 return ids;
616}
617
618void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
619
620#ifdef UNDERLAY_OMNET
621
622 // we have no mobility support for simulations
623 return
624
625#endif // UNDERLAY_OMNET
626
627/*- disabled!
628
629 // we only care about address changes, not about interface changes
630 // as address changes are triggered by interface changes, we are safe here
631 if( info.type != NetworkChangeInterface::EventTypeAddressNew &&
632 info.type != NetworkChangeInterface::EventTypeAddressDelete ) return;
633
634 logging_info( "base communication is handling network address changes" );
635
636 // get all now available addresses
637 NetworkInformation networkInformation;
638 AddressInformation addressInformation;
639
640 NetworkInterfaceList interfaces = networkInformation.getInterfaces();
641 AddressList addresses;
642
643 for( NetworkInterfaceList::iterator i = interfaces.begin(); i != interfaces.end(); i++ ){
644 AddressList newaddr = addressInformation.getAddresses(*i);
645 addresses.insert( addresses.end(), newaddr.begin(), newaddr.end() );
646 }
647
648 //
649 // get current locators for the local endpoint
650 // TODO: this code is dublicate of the ctor code!!! cleanup!
651 //
652
653 NetworkProtocol::NetworkLocatorSet locators = network->getAddresses();
654 NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin();
655 NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end();
656
657 //
658 // remember the old local endpoint, in case it changes
659 //
660
661 EndpointDescriptor oldLocalDescriptor( localDescriptor );
662
663 //
664 // look for local locators that we can use in communication
665 //
666 // choose the first locator that is not localhost
667 //
668
669 bool foundLocator = false;
670 bool changedLocator = false;
671
672 for( ; i != iend; i++){
673 logging_debug( "local locator found " << (*i)->toString() );
674 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
675
676 if( *ipv4locator != IPv4Locator::LOCALHOST &&
677 *ipv4locator != IPv4Locator::ANY &&
678 *ipv4locator != IPv4Locator::BROADCAST ){
679
680 ipv4locator->setPort( listenport );
681 changedLocator = *localDescriptor.locator != *ipv4locator;
682 localDescriptor.locator = ipv4locator;
683 logging_info( "binding to addr = " << ipv4locator->toString() );
684 foundLocator = true;
685 break;
686 }
687 } // for( ; i != iend; i++)
688
689 //
690 // if we found no locator, bind to localhost
691 //
692
693 if( !foundLocator ){
694 changedLocator = *localDescriptor.locator != IPv4Locator::LOCALHOST;
695 localDescriptor.locator = new IPv4Locator( IPv4Locator::LOCALHOST );
696 ((IPv4Locator*)(localDescriptor.locator))->setPort( listenport );
697 logging_info( "found no good local lcoator, binding to addr = " <<
698 localDescriptor.locator->toString() );
699 }
700
701 //
702 // if we have connections that have no more longer endpoints
703 // close these. they will be automatically built up again.
704 // also update the local locator in the linkset mapping
705 //
706
707 if( changedLocator ){
708
709 logging_debug( "local endp locator has changed to " << localDescriptor.toString() <<
710 ", resettings connections that end at old locator " <<
711 oldLocalDescriptor.toString());
712
713 LinkSet::iterator i = linkSet.begin();
714 LinkSet::iterator iend = linkSet.end();
715
716 for( ; i != iend; i++ ){
717
718 logging_debug( "checking connection for locator change: " <<
719 " local " << (*i).localLocator->toString() <<
720 " old " << oldLocalDescriptor.locator->toString() );
721
722 if( *((*i).localLocator) == *(oldLocalDescriptor.locator) ){
723
724 logging_debug("terminating connection to " << (*i).remoteLocator->toString() );
725 transport->terminate( oldLocalDescriptor.locator, (*i).remoteLocator );
726
727 (*i).localLocator = localDescriptor.locator;
728 }
729 } // for( ; i != iend; i++ )
730
731 // wait 500ms to give the sockets time to shut down
732 usleep( 500000 );
733
734 } else {
735
736 logging_debug( "locator has not changed, not resetting connections" );
737
738 }
739
740 //
741 // handle the connections that have no longer any
742 // valid locator. send update messages with the new
743 // locator, so the remote node updates its locator/link mapping
744 //
745
746 LinkSet::iterator iAffected = linkSet.begin();
747 LinkSet::iterator endAffected = linkSet.end();
748
749 for( ; iAffected != endAffected; iAffected++ ){
750 LinkDescriptor descr = *iAffected;
751 logging_debug( "sending out link locator update to " << descr.remoteLocator->toString() );
752
753 AribaBaseMsg updateMsg( descr.remoteLocator,
754 AribaBaseMsg::LINK_STATE_UPDATE,
755 descr.localLink, descr.remoteLink );
756
757 transport->sendMessage( &updateMsg );
758 }
759*/
760}
761
762/// sends a message to all end-points in the end-point descriptor
763void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) {
764 Data data = data_serialize(legacy_message, DEFAULT_V);
765
766 //// Adapt to new message system ////
767 // transfer data buffer ownership to the shared_buffer
768 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
769
770 reboost::message_t message;
771 message.push_back(buf);
772
773 transport->send(endpoint.getEndpoints(), message);
774}
775
776/// sends a message to the remote locator inside the link descriptor
777void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) {
778 if (desc.remoteLocator==NULL) return;
779
780 Data data = data_serialize(legacy_message, DEFAULT_V);
781
782 //// Adapt to new message system ////
783 // transfer data buffer ownership to the shared_buffer
784 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
785
786 reboost::message_t message;
787 message.push_back(buf);
788
789 desc.connection->send(message);
790}
791
792}} // namespace ariba, communication
Note: See TracBrowser for help on using the repository browser.