close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/communication/BaseCommunication.cpp@ 12524

Last change on this file since 12524 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 29.4 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseCommunication.h"
40
41#include "networkinfo/AddressDiscovery.h"
42#include "ariba/utility/types/PeerID.h"
43#include "ariba/utility/system/SystemQueue.h"
44#include <boost/function.hpp>
45
46#ifdef UNDERLAY_OMNET
47 #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
48 #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
49 #include "ariba/utility/system/StartupWrapper.h"
50
51 using ariba::communication::AribaOmnetModule;
52 using ariba::communication::OmnetNetworkProtocol;
53 using ariba::utility::StartupWrapper;
54#endif
55
56namespace ariba {
57namespace communication {
58
59using namespace ariba::addressing2;
60
61using ariba::utility::PeerID;
62using ariba::utility::SystemQueue;
63
64use_logging_cpp(BaseCommunication);
65
66
67BaseCommunication::BaseCommunication()
68 : currentSeqnum( 0 ),
69 transport( NULL ),
70 messageReceiver( NULL ),
71 started( false ),
72 listenOn_endpoints(new addressing2::endpoint_set())
73{
74}
75
76
77BaseCommunication::~BaseCommunication(){
78}
79
80
81void BaseCommunication::start(EndpointSetPtr listen_on) {
82 assert ( ! started );
83
84 listenOn_endpoints = listen_on;
85 logging_info("Setting local end-points: " << listenOn_endpoints->to_string());
86
87 logging_info( "Starting up ..." );
88 currentSeqnum = 0;
89
90 // creating transports
91 // ---> transport_peer holds the set of the active endpoints we're listening on
92 logging_info( "Creating transports ..." );
93 transport = new transport_peer();
94 active_listenOn_endpoints = transport->add_listenOn_endpoints(listenOn_endpoints);
95 logging_info( "XXX. Active endpoints = " << active_listenOn_endpoints->to_string() ); // XXX
96
97 logging_info( "Searching for local locators ..." );
98 local_endpoints = AddressDiscovery::discover_endpoints(active_listenOn_endpoints);
99 if ( local_endpoints->count() > 0 )
100 {
101 logging_info( "Done. Discovered local endpoints: " << local_endpoints->to_string() );
102 }
103 else
104 {
105 logging_warn("WARING!! No local endpoints found, NO COMMUNICATION POSSIBLE!!");
106
107 // TODO notify application, so that it may react properly. throw exception..?
108 assert( false );
109 }
110
111
112 // create local EndpointDescriptor
113 // ---> localDescriptor hold the set endpoints that can be used to reach us
114 localDescriptor.getPeerId() = PeerID::random();
115 localDescriptor.replace_endpoint_set(local_endpoints);
116 logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
117
118 // start transport_peer
119 transport->register_listener( this );
120 transport->start();
121
122 // bind to the network change detection
123 networkMonitor.registerNotification( this );
124
125 // base comm startup done
126 started = true;
127 logging_info( "Started up." );
128}
129
130void BaseCommunication::stop() {
131 logging_info( "Stopping transports ..." );
132
133 transport->stop();
134 delete transport;
135 started = false;
136
137 logging_info( "Stopped." );
138}
139
140bool BaseCommunication::isStarted(){
141 return started;
142}
143
144const LinkID BaseCommunication::establishLink(
145 const EndpointDescriptor& descriptor,
146 const LinkID& link_id,
147 const QoSParameterSet& qos,
148 const SecurityParameterSet& sec) {
149
150 // copy link id
151 LinkID linkid = link_id;
152
153 // debug
154 logging_debug( "Request to establish link" );
155
156 // create link identifier
157 if (linkid.isUnspecified()) linkid = LinkID::create();
158
159 // create link descriptor
160 logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() );
161 LinkDescriptor* ld = new LinkDescriptor();
162 ld->localLink = linkid;
163 addLink( ld );
164
165
166 /* send a message to request new link to remote */
167 logging_debug( "Send messages with request to open link to " << descriptor.toString() );
168
169 /*
170 * Create Link-Request Message:
171 * NOTE: - Their PeerID (in parent message)
172 * - Our LinkID
173 * - Our PeerID
174 * - Our EndpointDescriptor
175 */
176 reboost::message_t linkmsg;
177 linkmsg.push_back(linkid.serialize());
178 linkmsg.push_back(localDescriptor.getPeerId().serialize());
179 linkmsg.push_back(localDescriptor.endpoints->serialize());
180
181// // XXX AKTUELL BUG FINDING...
182// reboost::shared_buffer_t xxx = localDescriptor.endpoints->serialize();
183// EndpointSetPtr xxx_set = endpoint_set::create_EndpointSet();
184// xxx_set->deserialize(xxx);
185// cout << "/// MARIO VORHER: " << localDescriptor.endpoints->to_string() << endl;
186// cout << "/// MARIO NACHHER: " << xxx_set->to_string() << endl;
187
188 // send message
189 // TODO move enum to BaseComm
190 send_to_peer(AribaBaseMsg::typeLinkRequest, descriptor.getPeerId(), linkmsg,
191 descriptor, system_priority::OVERLAY);
192
193 return linkid;
194}
195
196void BaseCommunication::dropLink(const LinkID link) {
197
198 logging_debug( "Starting to drop link " + link.toString() );
199
200 // see if we have the link
201 LinkDescriptor& ld = queryLocalLink( link );
202 if( ld.isUnspecified() ) {
203 logging_error( "Don't know the link you want to drop "+ link.toString() );
204 return;
205 }
206
207 // tell the registered listeners
208 foreach( CommunicationEvents* i, eventListener ) {
209 i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
210 }
211
212
213 // * send message to drop the link *
214 logging_debug( "Sending out link close request. for us, the link is closed now" );
215 reboost::message_t empty_message;
216 send_over_link( AribaBaseMsg::typeLinkClose, empty_message, ld, system_priority::OVERLAY );
217
218 // remove from map
219 removeLink(link);
220}
221
222
223seqnum_t BaseCommunication::sendMessage( const LinkID& lid,
224 reboost::message_t message,
225 uint8_t priority,
226 bool bypass_overlay) throw(communication_message_not_sent)
227{
228 // message type: direct data or (normal) data
229 AribaBaseMsg::type_ type;
230 if ( bypass_overlay )
231 {
232 type = AribaBaseMsg::typeDirectData;
233 logging_debug( "Sending out direct-message to link " << lid.toString() );
234 }
235 else
236 {
237 type = AribaBaseMsg::typeData;
238 logging_debug( "Sending out message to link " << lid.toString() );
239 }
240
241
242 // query local link info
243 LinkDescriptor& ld = queryLocalLink(lid);
244 if( ld.isUnspecified() )
245 {
246 throw communication_message_not_sent("Don't know the link with id "
247 + lid.toString());
248 }
249
250 // link not up-> error
251 if( !ld.up )
252 {
253 throw communication_message_not_sent("Can not send on link "
254 + lid.toString() + ": link not up");
255 }
256
257
258 // * send message *
259 bool okay = send_over_link( type, message, ld, priority );
260
261 if ( ! okay )
262 {
263 throw communication_message_not_sent("send_over_link failed!");
264 }
265
266 return ++currentSeqnum;
267}
268
269const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
270 if( link.isUnspecified() ){
271 return localDescriptor;
272 } else {
273 LinkDescriptor& linkDesc = queryLocalLink(link);
274 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
275 return linkDesc.remoteDescriptor;
276 }
277}
278
279void BaseCommunication::registerEventListener(CommunicationEvents* _events){
280 if( eventListener.find( _events ) == eventListener.end() )
281 eventListener.insert( _events );
282}
283
284void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
285 EventListenerSet::iterator i = eventListener.find( _events );
286 if( i != eventListener.end() )
287 eventListener.erase( i );
288}
289
290
291
292/*------------------------------
293 | ASIO thread --> SystemQueue |
294 ------------------------------*/
295
296/// ASIO thread
297void BaseCommunication::receive_message(transport_connection::sptr connection,
298 reboost::shared_buffer_t msg) {
299
300 logging_debug( "Dispatching message" );
301
302 SystemQueue::instance().scheduleCall(
303 boost::bind(
304 &BaseCommunication::receiveMessage,
305 this,
306 connection,
307 msg)
308 );
309}
310
311/// ASIO thread
312void BaseCommunication::connection_terminated(transport_connection::sptr connection)
313{
314 SystemQueue::instance().scheduleCall(
315 boost::bind(
316 &BaseCommunication::connectionTerminated,
317 this,
318 connection)
319 );
320}
321
322/*--------------------------------
323 | [ASIO thread --> SystemQueue] |
324 -------------------------------*/
325
326/// ARIBA thread (System Queue)
327void BaseCommunication::connectionTerminated(transport_connection::sptr connection)
328{
329 vector<LinkID*> links = connection->get_communication_links();
330
331 logging_debug("[BaseCommunication] Connection terminated: "
332 << connection->getLocalEndpoint()->to_string()
333 << " <--> " << connection->getRemoteEndpoint()->to_string()
334 << " (" << links.size() << " links)");
335
336 // remove all links that used the terminated connection
337 for ( vector<LinkID*>::iterator it = links.begin(); it != links.end(); ++it )
338 {
339 LinkID& link_id = **it;
340
341 logging_debug(" ---> Removing link: " << link_id.toString());
342
343 // searching for link, not found-> warn
344 LinkDescriptor& linkDesc = queryLocalLink( link_id );
345 if (linkDesc.isUnspecified()) {
346 logging_warn("Failed to find local link " << link_id.toString());
347 continue;
348 }
349
350 // inform listeners
351 foreach( CommunicationEvents* i, eventListener ){
352 i->onLinkFail( linkDesc.localLink,
353 linkDesc.localLocator, linkDesc.remoteLocator );
354 }
355
356 // remove the link descriptor
357 removeLink( link_id );
358 }
359}
360
361/// ARIBA thread (System Queue)
362void BaseCommunication::receiveMessage(transport_connection::sptr connection,
363 reboost::shared_buffer_t message)
364{
365 // XXX
366 logging_debug("/// [receiveMessage] buffersize: " << message.size());
367
368 // get type
369 uint8_t type = message.data()[0];
370 reboost::shared_buffer_t sub_buff = message(1);
371
372 // get link id
373 LinkID link_id;
374 if ( type != AribaBaseMsg::typeLinkRequest)
375 {
376 sub_buff = link_id.deserialize(sub_buff);
377 }
378
379 // handle message
380 switch ( type )
381 {
382 // ---------------------------------------------------------------------
383 // data message
384 // ---------------------------------------------------------------------
385 case AribaBaseMsg::typeData:
386 {
387 logging_debug( "Received data message, forwarding to overlay." );
388 if( messageReceiver != NULL )
389 {
390 messageReceiver->receiveMessage(
391 sub_buff, link_id, NodeID::UNSPECIFIED, false
392 );
393 }
394
395 break;
396 }
397
398 // ---------------------------------------------------------------------
399 // direct data message (bypass overlay-layer)
400 // ---------------------------------------------------------------------
401 case AribaBaseMsg::typeDirectData:
402 {
403 logging_debug( "Received direct data message, forwarding to application." );
404
405 if( messageReceiver != NULL )
406 {
407 messageReceiver->receiveMessage(
408 sub_buff, link_id, NodeID::UNSPECIFIED, true
409 );
410 }
411
412 break;
413 }
414
415
416
417 // ---------------------------------------------------------------------
418 // handle link request from remote
419 // ---------------------------------------------------------------------
420 case AribaBaseMsg::typeLinkRequest:
421 {
422 logging_debug( "Received link open request on "
423 << connection->getLocalEndpoint()->to_string() );
424
425 /*
426 * Deserialize Peer Message
427 * - Our PeerID
428 */
429 PeerID our_peer_id;
430 sub_buff = our_peer_id.deserialize(sub_buff);
431
432 /// not the correct peer id-> skip request
433 if ( our_peer_id != localDescriptor.getPeerId() &&
434 ! our_peer_id.isUnspecified() /* overlay bootstrap */ )
435 {
436 logging_info("Received link request for "
437 << our_peer_id.toString()
438 << "but i'm "
439 << localDescriptor.getPeerId()
440 << ": Ignoring!");
441
442 // TODO terminate connection?
443
444 break;
445 }
446
447
448 /*
449 * Deserialize Link-Request Message:
450 * - Their LinkID
451 * - Their PeerID
452 * - Their EndpointDescriptor
453 */
454 LinkID their_link_id;
455 PeerID their_peer_id;
456 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet();
457 sub_buff = their_link_id.deserialize(sub_buff);
458 sub_buff = their_peer_id.deserialize(sub_buff);
459 sub_buff = their_endpoints->deserialize(sub_buff);
460 /* [ Deserialize Link-Request Message ] */
461
462
463 /// only answer the first request
464 if (!queryRemoteLink(their_link_id).isUnspecified())
465 {
466
467 // TODO aktuell: When will these connections be closed?
468 // ---> Close it now (if it services no links) ?
469 // (see also ! allowlink below)
470
471 // XXX AKTUELL TESTING !! This will cause race conditions. So this is test-code only!
472 if ( connection->get_communication_links().size() == 0 )
473 {
474 connection->terminate();
475 }
476
477 logging_debug("Link request already received. Ignore!");
478 break;
479 }
480
481 /// create link ids
482 LinkID localLink = LinkID::create();
483 LinkID remoteLink = their_link_id; // XXX intermediate variable is unnecessary
484 logging_debug(
485 "local=" << connection->getLocalEndpoint()->to_string()
486 << " remote=" << connection->getRemoteEndpoint()->to_string()
487 );
488
489 // check if link creation is allowed by ALL listeners
490 bool allowlink = true;
491 foreach( CommunicationEvents* i, eventListener ){
492 allowlink &= i->onLinkRequest( localLink,
493 connection->getLocalEndpoint(),
494 connection->getRemoteEndpoint());
495 }
496
497 // not allowed-> warn
498 if( !allowlink ){
499 logging_warn( "Overlay denied creation of link" );
500 return;
501 }
502
503 // create descriptor
504 LinkDescriptor* ld = new LinkDescriptor();
505 ld->localLink = localLink;
506 ld->remoteLink = remoteLink;
507 ld->localLocator = connection->getLocalEndpoint();
508 ld->remoteLocator = connection->getRemoteEndpoint();
509 ld->remoteDescriptor = EndpointDescriptor(their_peer_id, their_endpoints);
510 ld->set_connection(connection);
511
512
513 // update endpoints (should only have any effect in case of NAT)
514 ld->remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint());
515// localDescriptor.endpoints->add_endpoint(connection->getLocalEndpoint()); // XXX 0.0.0.0:0
516
517 // link is now up-> add it
518 ld->up = true;
519 addLink(ld);
520
521
522
523 /* sending link reply */
524 logging_debug( "Sending link reply with ids "
525 << "local=" << localLink.toString() << ", "
526 << "remote=" << remoteLink.toString() );
527
528 /*
529 * Create Link-Reply Message:
530 * - Our LinkID
531 * - Our Endpoint_Set (as update)
532 * - Their EndpointDescriptor (maybe they learn something about NAT)
533 */
534 reboost::message_t linkmsg;
535 linkmsg.push_back(localLink.serialize());
536 linkmsg.push_back(localDescriptor.endpoints->serialize());
537 linkmsg.push_back(ld->remoteDescriptor.endpoints->serialize());
538
539 // XXX
540 cout << "/// MARIO: " << ld->get_connection()->getRemoteEndpoint()->to_string() << endl;
541
542 // send message
543 bool sent = send_over_link( AribaBaseMsg::typeLinkReply, linkmsg, *ld, system_priority::OVERLAY );
544
545 if ( ! sent )
546 {
547 logging_error("ERROR: Could not send LinkReply to: " << ld->remoteLocator->to_string());
548
549 // TODO remove link, close link, ..?
550
551 break;
552 }
553
554
555 // link is up!
556 logging_debug( "Link (initiated from remote) is up with "
557 << "local(id=" << ld->localLink.toString() << ","
558 << "locator=" << ld->localLocator->to_string() << ") "
559 << "remote(id=" << ld->remoteLink.toString() << ", "
560 << "locator=" << ld->remoteLocator->to_string() << ")"
561 );
562
563 // inform listeners about new open link
564 foreach( CommunicationEvents* i, eventListener ) {
565 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
566 }
567
568 // done
569 break;
570 }
571
572 // ---------------------------------------------------------------------
573 // handle link request reply
574 // ---------------------------------------------------------------------
575 case AribaBaseMsg::typeLinkReply:
576 {
577 logging_debug( "Received link open reply for a link we initiated" );
578
579 /*
580 * Deserialize Link-Reply Message:
581 * - Their LinkID
582 * - Their Endpoint_Set (as update)
583 * - Our EndpointDescriptor (maybe we can learn something about NAT)
584 */
585 LinkID their_link_id;
586 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet();
587 EndpointSetPtr our_endpoints = endpoint_set::create_EndpointSet();
588 sub_buff = their_link_id.deserialize(sub_buff);
589 sub_buff = their_endpoints->deserialize(sub_buff);
590 sub_buff = our_endpoints->deserialize(sub_buff);
591
592
593 // this is a reply to a link open request, so we have already
594 // a link mapping and can now set the remote link to valid
595 LinkDescriptor& ld = queryLocalLink( link_id );
596
597 // no link found-> warn!
598 if (ld.isUnspecified()) {
599 logging_warn("Failed to find local link " << link_id.toString());
600 return;
601 }
602
603 if ( ld.up )
604 {
605 logging_warn("Got link replay for already open link. Ignore. LinkID: " << link_id.toString());
606
607 // TODO send LinkClose ?
608 return;
609 }
610
611 // store the connection
612 ld.set_connection(connection);
613
614 // set remote locator and link id
615 ld.remoteLink = their_link_id;
616 ld.remoteLocator = connection->getRemoteEndpoint();
617
618
619 /* Update endpoints */
620 // NOTE: we might loose some information here, but it's our only chance to get rid of outdated information.
621 ld.remoteDescriptor.replace_endpoint_set(their_endpoints);
622
623 // add actual remote endpoint to this set (should only have any effect in case of NAT)
624 ld.remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint());
625
626 // TODO In case of NAT, we could learn something about our external IP.
627 // ---> But we must trust the remote peer about this information!!
628// localDescriptor.endpoints->add_endpoints(our_endpoints);
629
630
631
632
633 ld.up = true;
634
635 logging_debug( "Link is now up with local id "
636 << ld.localLink.toString() << " and remote id "
637 << ld.remoteLink.toString() );
638
639
640 // inform lisneters about link up event
641 foreach( CommunicationEvents* i, eventListener ){
642 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
643 }
644
645 // done
646 break;
647 }
648
649 // ---------------------------------------------------------------------
650 // handle link close requests
651 // ---------------------------------------------------------------------
652 case AribaBaseMsg::typeLinkClose: {
653 // get remote link
654// const LinkID& localLink = msg.getRemoteLink();
655 logging_debug( "Received link close request for link " << link_id.toString() );
656
657 // searching for link, not found-> warn
658 LinkDescriptor& linkDesc = queryLocalLink( link_id );
659 if (linkDesc.isUnspecified()) {
660 logging_warn("Failed to find local link " << link_id.toString());
661 return;
662 }
663
664 // inform listeners
665 foreach( CommunicationEvents* i, eventListener ){
666 i->onLinkDown( linkDesc.localLink,
667 linkDesc.localLocator, linkDesc.remoteLocator );
668 }
669
670 // remove the link descriptor
671 removeLink( link_id );
672
673 // done
674 break;
675 }
676
677 // ---------------------------------------------------------------------
678 // handle link locator changes -- TODO is this ever called..?
679 // ---------------------------------------------------------------------
680// case AribaBaseMsg::typeLinkUpdate: {
681// const LinkID& localLink = msg.getRemoteLink();
682// logging_debug( "Received link update for link "
683// << localLink.toString() );
684//
685// // find the link description
686// LinkDescriptor& linkDesc = queryLocalLink( localLink );
687// if (linkDesc.isUnspecified()) {
688// logging_warn("Failed to update local link "
689// << localLink.toString());
690// return;
691// }
692//
693// // update the remote locator
694// addressing2::EndpointPtr oldremote = linkDesc.remoteLocator;
695// linkDesc.remoteLocator = connection->getRemoteEndpoint();
696//
697// // TODO update linkDesc.connection ?
698//
699// // inform the listeners (local link has _not_ changed!)
700// foreach( CommunicationEvents* i, eventListener ){
701// i->onLinkChanged(
702// linkDesc.localLink, // linkid
703// linkDesc.localLocator, // old local
704// linkDesc.localLocator, // new local
705// oldremote, // old remote
706// linkDesc.remoteLocator // new remote
707// );
708// }
709//
710// // done
711// break;
712// }
713
714
715 default: {
716 logging_warn( "Received unknown message type!" );
717 break;
718 }
719
720 }
721}
722
723/// add a newly allocated link to the set of links
724void BaseCommunication::addLink( LinkDescriptor* link ) {
725 linkSet.push_back( link );
726}
727
728/// remove a link from set
729void BaseCommunication::removeLink( const LinkID& localLink ) {
730 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
731 if( (*i)->localLink != localLink) continue;
732// remove_endpoint((*i)->remoteLocator); // XXX
733 delete *i;
734 linkSet.erase( i );
735 break;
736 }
737}
738
739/// query a descriptor by local link id
740BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
741 for (size_t i=0; i<linkSet.size();i++)
742 if (linkSet[i]->localLink == link) return (LinkDescriptor&)*linkSet[i];
743
744 return LinkDescriptor::UNSPECIFIED();
745}
746
747/// query a descriptor by remote link id
748BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
749 for (size_t i=0; i<linkSet.size();i++)
750 if (linkSet[i]->remoteLink == link) return (LinkDescriptor&)*linkSet[i];
751
752 return LinkDescriptor::UNSPECIFIED();
753}
754
755//LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
756// LinkIDs ids;
757// for (size_t i=0; i<linkSet.size(); i++){
758// if( addr == NULL ){
759// ids.push_back( linkSet[i]->localLink );
760// } else {
761// if ( *linkSet[i]->remoteLocator == *addr )
762// ids.push_back( linkSet[i]->localLink );
763// }
764// }
765// return ids;
766//}
767
768void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
769
770/*- disabled!
771
772 // we only care about address changes, not about interface changes
773 // as address changes are triggered by interface changes, we are safe here
774 if( info.type != NetworkChangeInterface::EventTypeAddressNew &&
775 info.type != NetworkChangeInterface::EventTypeAddressDelete ) return;
776
777 logging_info( "base communication is handling network address changes" );
778
779 // get all now available addresses
780 NetworkInformation networkInformation;
781 AddressInformation addressInformation;
782
783 NetworkInterfaceList interfaces = networkInformation.getInterfaces();
784 AddressList addresses;
785
786 for( NetworkInterfaceList::iterator i = interfaces.begin(); i != interfaces.end(); i++ ){
787 AddressList newaddr = addressInformation.getAddresses(*i);
788 addresses.insert( addresses.end(), newaddr.begin(), newaddr.end() );
789 }
790
791 //
792 // get current locators for the local endpoint
793 // TODO: this code is dublicate of the ctor code!!! cleanup!
794 //
795
796 NetworkProtocol::NetworkLocatorSet locators = network->getAddresses();
797 NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin();
798 NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end();
799
800 //
801 // remember the old local endpoint, in case it changes
802 //
803
804 EndpointDescriptor oldLocalDescriptor( localDescriptor );
805
806 //
807 // look for local locators that we can use in communication
808 //
809 // choose the first locator that is not localhost
810 //
811
812 bool foundLocator = false;
813 bool changedLocator = false;
814
815 for( ; i != iend; i++){
816 logging_debug( "local locator found " << (*i)->toString() );
817 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
818
819 if( *ipv4locator != IPv4Locator::LOCALHOST &&
820 *ipv4locator != IPv4Locator::ANY &&
821 *ipv4locator != IPv4Locator::BROADCAST ){
822
823 ipv4locator->setPort( listenport );
824 changedLocator = *localDescriptor.locator != *ipv4locator;
825 localDescriptor.locator = ipv4locator;
826 logging_info( "binding to addr = " << ipv4locator->toString() );
827 foundLocator = true;
828 break;
829 }
830 } // for( ; i != iend; i++)
831
832 //
833 // if we found no locator, bind to localhost
834 //
835
836 if( !foundLocator ){
837 changedLocator = *localDescriptor.locator != IPv4Locator::LOCALHOST;
838 localDescriptor.locator = new IPv4Locator( IPv4Locator::LOCALHOST );
839 ((IPv4Locator*)(localDescriptor.locator))->setPort( listenport );
840 logging_info( "found no good local lcoator, binding to addr = " <<
841 localDescriptor.locator->toString() );
842 }
843
844 //
845 // if we have connections that have no more longer endpoints
846 // close these. they will be automatically built up again.
847 // also update the local locator in the linkset mapping
848 //
849
850 if( changedLocator ){
851
852 logging_debug( "local endp locator has changed to " << localDescriptor.toString() <<
853 ", resettings connections that end at old locator " <<
854 oldLocalDescriptor.toString());
855
856 LinkSet::iterator i = linkSet.begin();
857 LinkSet::iterator iend = linkSet.end();
858
859 for( ; i != iend; i++ ){
860
861 logging_debug( "checking connection for locator change: " <<
862 " local " << (*i).localLocator->toString() <<
863 " old " << oldLocalDescriptor.locator->toString() );
864
865 if( *((*i).localLocator) == *(oldLocalDescriptor.locator) ){
866
867 logging_debug("terminating connection to " << (*i).remoteLocator->toString() );
868 transport->terminate( oldLocalDescriptor.locator, (*i).remoteLocator );
869
870 (*i).localLocator = localDescriptor.locator;
871 }
872 } // for( ; i != iend; i++ )
873
874 // wait 500ms to give the sockets time to shut down
875 usleep( 500000 );
876
877 } else {
878
879 logging_debug( "locator has not changed, not resetting connections" );
880
881 }
882
883 //
884 // handle the connections that have no longer any
885 // valid locator. send update messages with the new
886 // locator, so the remote node updates its locator/link mapping
887 //
888
889 LinkSet::iterator iAffected = linkSet.begin();
890 LinkSet::iterator endAffected = linkSet.end();
891
892 for( ; iAffected != endAffected; iAffected++ ){
893 LinkDescriptor descr = *iAffected;
894 logging_debug( "sending out link locator update to " << descr.remoteLocator->toString() );
895
896 AribaBaseMsg updateMsg( descr.remoteLocator,
897 AribaBaseMsg::LINK_STATE_UPDATE,
898 descr.localLink, descr.remoteLink );
899
900 transport->sendMessage( &updateMsg );
901 }
902*/
903}
904
905
906addressing2::EndpointPtr BaseCommunication::get_local_endpoint_of_link(
907 const LinkID& linkid)
908{
909 LinkDescriptor& ld = queryLocalLink(linkid);
910
911 return ld.get_connection()->getLocalEndpoint();
912}
913
914addressing2::EndpointPtr BaseCommunication::get_remote_endpoint_of_link(
915 const LinkID& linkid)
916{
917 LinkDescriptor& ld = queryLocalLink(linkid);
918
919 return ld.get_connection()->getRemoteEndpoint();
920}
921
922
923
924bool BaseCommunication::send_over_link(
925 const uint8_t type,
926 reboost::message_t message,
927 const LinkDescriptor& desc,
928 const uint8_t priority)
929{
930 /*
931 * Create Link Message:
932 * - Type
933 * - Their LinkID
934 */
935 // link id
936 message.push_front(desc.remoteLink.serialize());
937 // type
938 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t));
939 /* [ Create Link Message ] */
940
941
942 /* send message */
943 transport_connection::sptr conn = desc.get_connection();
944 if ( ! conn )
945 {
946 cout << "/// MARIO: No connection!!" << endl; // XXX debug
947 return false;
948 }
949
950 // * send over connection *
951 return conn->send(message, priority);
952}
953
954void BaseCommunication::send_to_peer(
955 const uint8_t type,
956 const PeerID& peer_id,
957 reboost::message_t message,
958 const EndpointDescriptor& endpoint,
959 const uint8_t priority )
960{
961 /*
962 * Create Peer Message:
963 * - Type
964 * - Their PeerID
965 */
966 // peer id
967 message.push_front(peer_id.serialize());
968 // type
969 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t));
970
971
972 /* send message */
973 transport->send(endpoint.getEndpoints(), message, priority);
974}
975
976
977
978
979}} // namespace ariba, communication
Note: See TracBrowser for help on using the repository browser.