close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/communication/BaseCommunication.cpp@ 2440

Last change on this file since 2440 was 2440, checked in by Christoph Mayer, 16 years ago

-memory leaks und corruption gefixt

File size: 19.9 KB
RevLine 
1// [Licence]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [Licence]
38
39#include "BaseCommunication.h"
40
41#ifdef UNDERLAY_OMNET
42 #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
43 #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
44 #include "ariba/utility/system/StartupWrapper.h"
45
46 using ariba::communication::AribaOmnetModule;
47 using ariba::communication::OmnetNetworkProtocol;
48 using ariba::utility::StartupWrapper;
49#endif
50
51namespace ariba {
52namespace communication {
53
54use_logging_cpp(BaseCommunication);
55const BaseCommunication::LinkDescriptor BaseCommunication::LinkDescriptor::UNSPECIFIED;
56
57BaseCommunication::BaseCommunication(const NetworkLocator* _locallocator, const uint16_t _listenport)
58 : messageReceiver( NULL ), currentSeqnum( 0 ), listenport( _listenport ) {
59
60 logging_info( "starting up base communication and creating transports ..." );
61 logging_info( "using port " << listenport );
62
63#ifdef UNDERLAY_OMNET
64 AribaOmnetModule* module = StartupWrapper::getCurrentModule();
65 module->setServerPort( listenport );
66
67 transport = module;
68 network = new OmnetNetworkProtocol( module );
69#else
70 transport = new TCPTransport( listenport );
71 network = new IPv4NetworkProtocol();
72#endif
73
74 logging_debug( "searching for local locators ..." );
75
76 NetworkProtocol::NetworkLocatorSet locators = network->getAddresses();
77 NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin();
78 NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end();
79
80 //
81 // choose the first locator that is not localhost
82 //
83
84 bool foundLocator = false;
85
86 for( ; i != iend; i++){
87 logging_debug( "local locator found " + (*i)->toString() );
88 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
89
90 // TODO: which locators are find to bind to?
91 // localhost is not too bad, works when testing locally
92 // with several instances. the manual override currently
93 // enables to use an arbitrary address, guess this is fine.
94 // so the manual override also can use ANY, LOCALHOST, BROADCAST
95
96 if( *ipv4locator != IPv4Locator::LOCALHOST &&
97 *ipv4locator != IPv4Locator::ANY &&
98 *ipv4locator != IPv4Locator::BROADCAST ){
99
100 ipv4locator->setPort(listenport);
101 localDescriptor.locator = ipv4locator;
102 localDescriptor.isUnspec = false;
103 logging_info( "binding to addr = " + ipv4locator->toString() );
104 foundLocator = true;
105 break;
106 }
107 } // for( ; i != iend; i++)
108
109
110 if( _locallocator != NULL ) {
111 if( localDescriptor.locator != NULL) delete localDescriptor.locator;
112 localDescriptor.locator = new IPv4Locator( IPv4Locator::fromString( _locallocator->toString()) );
113 localDescriptor.isUnspec = false;
114 logging_debug( "manual locator override, using locator=" <<
115 localDescriptor.locator->toString() );
116 foundLocator = true;
117 }
118
119 // if we found no local locator, exit using logging fatal
120 if( !foundLocator )
121 logging_fatal( "did not find a useable local locator!" );
122
123 transport->addMessageReceiver( this );
124 transport->start();
125
126#ifndef UNDERLAY_OMNET
127 //
128 // bind to the network change detection
129 //
130
131 networkMonitor.registerNotification( this );
132#endif
133
134 //
135 // base comm startup done
136 //
137
138 logging_info( "base communication started up" );
139}
140
141BaseCommunication::~BaseCommunication() {
142
143 logging_info( "stopping base communication and transport ..." );
144
145 transport->stop();
146 delete transport;
147 delete network;
148
149 logging_info( "base communication stopped" );
150}
151
152const LinkID BaseCommunication::establishLink(
153 const EndpointDescriptor& descriptor,
154 const QoSParameterSet& qos,
155 const SecurityParameterSet& sec){
156
157 logging_debug( "request to establish link" );
158
159 //
160 // just use the first locator in the endp descriptors
161 //
162
163 if( descriptor.locator == NULL ){
164 logging_error( "invalid destination endpoint" );
165 return LinkID::UNSPECIFIED;
166 }
167
168 if( localDescriptor.locator == NULL ){
169 logging_error( "invalid local endpoint" );
170 return LinkID::UNSPECIFIED;
171 }
172
173 const NetworkLocator* remote = descriptor.locator;
174 const NetworkLocator* local = localDescriptor.locator;
175
176 //
177 // create link and link descriptor
178 //
179
180 LinkID linkid = LinkID::create();
181
182 logging_debug( "creating new local descriptor entry with local link id " + linkid.toString() );
183 LinkDescriptor linkDescriptor( linkid, local, LinkID::UNSPECIFIED, remote, descriptor );
184 addLink( linkDescriptor );
185
186 //
187 // create a base msg with our link id and
188 // a request to open a link on the other side
189 //
190
191 logging_debug( "sending out base messages with request to open link to " + remote->toString() );
192 AribaBaseMsg baseMsg( remote, AribaBaseMsg::LINK_STATE_OPEN_REQUEST, linkid,
193 LinkID::UNSPECIFIED );
194 transport->sendMessage(&baseMsg);
195
196 return linkid;
197}
198
199void BaseCommunication::dropLink(const LinkID link) {
200
201 logging_debug( "starting to drop link " + link.toString() );
202
203 // see if we have the link
204 LinkDescriptor& descriptor = queryLocalLink( link );
205 if( descriptor.isUnspecified() ){
206 logging_error( "don't know the link you want to drop" );
207 return;
208 }
209
210 // create message to drop the link
211 logging_debug( "sending out link close request. for us, the link is closed now" );
212 AribaBaseMsg msg(
213 descriptor.remoteLocator,
214 AribaBaseMsg::LINK_STATE_CLOSE_REQUEST,
215 descriptor.localLink,
216 descriptor.remoteLink
217 );
218
219 // send message to drop the link
220 transport->sendMessage( &msg );
221
222 // tell the registered listeners
223 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
224 i->onLinkDown( link, descriptor.localLocator, descriptor.remoteLocator );
225 }
226
227 // remove from map
228 removeLink(link);
229}
230
231seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
232
233 logging_debug( "sending out message to link " + lid.toString() );
234
235 // query local link info
236 LinkDescriptor& linkDesc = queryLocalLink(lid);
237 if( linkDesc.isUnspecified() ){
238 logging_error( "don't know the link with id " + lid.toString() );
239 return 0;
240 }
241
242 // create message
243 AribaBaseMsg msg(
244 linkDesc.remoteLocator,
245 AribaBaseMsg::LINK_STATE_DATA,
246 linkDesc.localLink,
247 linkDesc.remoteLink
248 );
249
250 // encapsulate the payload message
251 msg.encapsulate( const_cast<Message*>(message) );
252
253 // send message
254 transport->sendMessage( &msg );
255
256 return ++currentSeqnum;
257}
258
259const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
260
261 if( link == LinkID::UNSPECIFIED){
262 return localDescriptor;
263 } else {
264 LinkDescriptor& linkDesc = queryLocalLink(link);
265 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED;
266 return linkDesc.remoteEndpoint;
267 }
268}
269
270void BaseCommunication::registerMessageReceiver(MessageReceiver* _receiver) {
271 messageReceiver = _receiver;
272}
273
274void BaseCommunication::unregisterMessageReceiver(MessageReceiver* _receiver) {
275 messageReceiver = NULL;
276}
277
278void BaseCommunication::registerEventListener(CommunicationEvents* _events){
279
280 if( eventListener.find( _events ) == eventListener.end() )
281 eventListener.insert( _events );
282}
283
284void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
285
286 EventListenerSet::iterator i = eventListener.find( _events );
287 if( i != eventListener.end() )
288 eventListener.erase( i );
289}
290
291
292bool BaseCommunication::receiveMessage(const Message* message, const LinkID& link, const NodeID& node){
293
294 //
295 // these messages arrive from the Transport module
296 // and are incoming network messages. Unpack the
297 // AribaBaseMsg and handle control packets,
298 // deliver data packets to the overlay
299 //
300
301 AribaBaseMsg* spovmsg = ((Message*)message)->decapsulate<AribaBaseMsg>();
302 logging_debug( "receiving base comm message of type " + spovmsg->getTypeString() );
303
304 //
305 // deliver data to the overlays. we just give the
306 // inner packet to every registered overlay ...
307 //
308
309 if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_DATA ){
310
311 logging_debug( "received data message, forwarding to overlay" );
312
313 //
314 // put the linkid as address into the message
315 // and sent it to the receiver
316 //
317
318 if( messageReceiver != NULL ) {
319 messageReceiver->receiveMessage(
320 spovmsg,
321 spovmsg->getRemoteLink(),
322 NodeID::UNSPECIFIED
323 );
324 }
325
326 } // LINK_STATE_DATA
327
328 //
329 // handle link open requests
330 //
331
332 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_OPEN_REQUEST ){
333
334 logging_debug( "received link open request" );
335
336 //
337 // create a link context
338 //
339
340 // in an incoming packet the localLink is from
341 // the sender perspective local and from our
342 // perspective remote
343
344 logging_debug( "creating local link" );
345
346 LinkID localLink = LinkID::create();
347 LinkID remoteLink = spovmsg->getLocalLink();
348
349
350 const NetworkLocator* localLocator = dynamic_cast<const NetworkLocator*>(localDescriptor.locator);
351 const NetworkLocator* remoteLocator = dynamic_cast<const NetworkLocator*>(message->getSourceAddress());
352
353 logging_debug( "localLocator=" + localLocator->toString() + " remoteLocator="+remoteLocator->toString());
354
355 // ask the registered listeners if this link
356 // creation is fine. we will only allow the
357 // link if all of them agree
358
359 bool allowlink = true;
360 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
361 allowlink &= i->onLinkRequest( localLink, localLocator, remoteLocator );
362 }
363
364 if( !allowlink ){
365 logging_warn( "overlay denied creation of link" );
366 return true;
367 }
368
369 //
370 // create and save the descriptor for the link
371 //
372
373 LinkDescriptor linkDescriptor(localLink, localLocator, remoteLink,
374 remoteLocator, EndpointDescriptor(remoteLocator));
375
376 logging_debug( "saving new link descriptor with " <<
377 "[local link " << localLink.toString() << "] " <<
378 "[local locator " << localLocator->toString() << "] " <<
379 "[remote link " << remoteLink.toString() << "] " <<
380 "[remote locator " << remoteLocator->toString() << "]" );
381
382 addLink( linkDescriptor );
383
384 //
385 // send out a link reply
386 //
387
388 logging_debug( "sending back link open reply for " <<
389 "[local link " << localLink.toString() << "] " <<
390 "[remote link " << remoteLink.toString() << "]" );
391
392 AribaBaseMsg reply(remoteLocator,
393 AribaBaseMsg::LINK_STATE_OPEN_REPLY,
394 localLink,
395 remoteLink);
396
397 transport->sendMessage( &reply );
398
399 //
400 // the link is now open
401 //
402
403 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
404 i->onLinkUp( localLink, localLocator, remoteLocator );
405 }
406
407 } // LINK_STATE_OPEN_REQUEST
408
409 //
410 // handle link open replies
411 //
412
413 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_OPEN_REPLY ){
414
415 logging_debug( "received link open reply for a link we initiated" );
416
417 // this is a reply to a link open request, so we have already
418 // a link mapping and can now set the remote link to valid
419 LinkDescriptor& linkDesc = queryLocalLink( spovmsg->getRemoteLink() );
420
421 if (linkDesc.isUnspecified()) {
422 logging_warn("Failed to find local link "+spovmsg->getRemoteLink().toString());
423 return false;
424 }
425
426 linkDesc.remoteLink = spovmsg->getLocalLink();
427 logging_debug( "the link is now up with local link id " + spovmsg->getRemoteLink().toString() );
428
429 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
430 i->onLinkUp( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator );
431 }
432
433 } // LINK_STATE_OPEN_REPLY
434
435 //
436 // handle link close requests
437 //
438
439 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_CLOSE_REQUEST ){
440
441 const LinkID& localLink = spovmsg->getRemoteLink();
442 logging_debug( "received link close request for link " + localLink.toString() );
443
444 //
445 // the link is closed immediately, we
446 // don't need to send out a reply, so we
447 // delete the mapping and inform
448 //
449
450 LinkDescriptor& linkDesc = queryLocalLink( localLink );
451 if (linkDesc.isUnspecified()) {
452 logging_warn("Failed to find local link "+localLink.toString());
453 return false;
454 }
455
456 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
457 i->onLinkDown( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator );
458 }
459
460 removeLink( localLink );
461
462 } // LINK_STATE_CLOSE_REQUEST
463
464 //
465 // handle locator updates
466 //
467
468 else if( spovmsg->getType() == AribaBaseMsg::LINK_STATE_UPDATE ){
469
470 const LinkID& localLink = spovmsg->getRemoteLink();
471 logging_debug( "received link update for link " + localLink.toString() );
472
473 //
474 // find the link description
475 //
476
477 LinkDescriptor& linkDesc = queryLocalLink( localLink );
478 if (linkDesc.isUnspecified()) {
479 logging_warn("Failed to update local link "+localLink.toString());
480 return false;
481 }
482
483 //
484 // update the remote locator
485 //
486
487 const NetworkLocator* oldremote = linkDesc.remoteLocator;
488 linkDesc.remoteLocator = dynamic_cast<const NetworkLocator*>(message->getSourceAddress());
489
490 //
491 // inform the listeners (local link has _not_ changed!)
492 //
493
494 BOOST_FOREACH( CommunicationEvents* i, eventListener ){
495 i->onLinkChanged(
496 linkDesc.localLink, // linkid
497 linkDesc.localLocator, // old local
498 linkDesc.localLocator, // new local
499 oldremote, // old remote
500 linkDesc.remoteLocator // new remote
501 );
502 }
503
504 } // LINK_STATE_UPDATE
505
506 return true;
507}
508
509void BaseCommunication::addLink( const LinkDescriptor& link ) {
510 linkSet.push_back( link );
511}
512
513void BaseCommunication::removeLink( const LinkID& localLink ) {
514
515 LinkSet::iterator i = linkSet.begin();
516 LinkSet::iterator iend = linkSet.end();
517
518 for( ; i != iend; i++){
519 if( (*i).localLink == localLink){
520 linkSet.erase( i );
521 break;
522 }
523 }
524}
525
526BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
527 for (int i=0; i<linkSet.size();i++)
528 if (linkSet[i].localLink == link) return (LinkDescriptor&)linkSet[i];
529 return (LinkDescriptor&)LinkDescriptor::UNSPECIFIED;
530}
531
532BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
533 for (int i=0; i<linkSet.size();i++)
534 if (linkSet[i].remoteLink == link) return (LinkDescriptor&)linkSet[i];
535 return (LinkDescriptor&)LinkDescriptor::UNSPECIFIED;
536}
537
538LinkIDs BaseCommunication::getLocalLinks( const EndpointDescriptor& ep ) const {
539 LinkIDs ids;
540
541 for (int i=0; i<linkSet.size(); i++){
542 if( ep == EndpointDescriptor::UNSPECIFIED ){
543 ids.push_back( linkSet[i].localLink );
544 } else {
545 if ( linkSet[i].remoteLocator == ep.locator )
546 ids.push_back( linkSet[i].localLink );
547 }
548 }
549
550 return ids;
551}
552
553void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
554
555#ifdef UNDERLAY_OMNET
556
557 // we have no mobility support for simulations
558 return
559
560#endif // UNDERLAY_OMNET
561
562 //
563 // we only care about address changes, not about interface changes
564 // as address changes are triggered by interface changes, we are safe here
565 //
566
567 if( info.type != NetworkChangeInterface::EventTypeAddressNew &&
568 info.type != NetworkChangeInterface::EventTypeAddressDelete ) return;
569
570 logging_info("base communication is handling network address changes");
571
572 //
573 // get all now available addresses
574 //
575
576 NetworkInformation networkInformation;
577 AddressInformation addressInformation;
578
579 NetworkInterfaceList interfaces = networkInformation.getInterfaces();
580 AddressList addresses;
581
582 for( NetworkInterfaceList::iterator i = interfaces.begin(); i != interfaces.end(); i++ ){
583 AddressList newaddr = addressInformation.getAddresses(*i);
584 addresses.insert( addresses.end(), newaddr.begin(), newaddr.end() );
585 }
586
587 //
588 // get current locators for the local endpoint
589 // TODO: this code is dublicate of the ctor code!!! cleanup!
590 //
591
592 NetworkProtocol::NetworkLocatorSet locators = network->getAddresses();
593 NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin();
594 NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end();
595
596 //
597 // remember the old local endpoint, in case it changes
598 //
599
600 EndpointDescriptor oldLocalDescriptor( localDescriptor );
601
602 //
603 // look for local locators that we can use in communication
604 //
605 // choose the first locator that is not localhost
606 //
607
608 bool foundLocator = false;
609 bool changedLocator = false;
610
611 for( ; i != iend; i++){
612 logging_debug( "local locator found " + (*i)->toString() );
613 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
614
615 if( *ipv4locator != IPv4Locator::LOCALHOST &&
616 *ipv4locator != IPv4Locator::ANY &&
617 *ipv4locator != IPv4Locator::BROADCAST ){
618
619 ipv4locator->setPort( listenport );
620 changedLocator = *localDescriptor.locator != *ipv4locator;
621 localDescriptor.locator = ipv4locator;
622 logging_info( "binding to addr = " << ipv4locator->toString() );
623 foundLocator = true;
624 break;
625 }
626 } // for( ; i != iend; i++)
627
628 //
629 // if we found no locator, bind to localhost
630 //
631
632 if( !foundLocator ){
633 changedLocator = *localDescriptor.locator != IPv4Locator::LOCALHOST;
634 localDescriptor.locator = new IPv4Locator( IPv4Locator::LOCALHOST );
635 ((IPv4Locator*)(localDescriptor.locator))->setPort( listenport );
636 logging_info( "found no good local lcoator, binding to addr = " <<
637 localDescriptor.locator->toString() );
638 }
639
640 //
641 // if we have connections that have no more longer endpoints
642 // close these. they will be automatically built up again.
643 // also update the local locator in the linkset mapping
644 //
645
646 if( changedLocator ){
647
648 logging_debug( "local endp locator has changed to " << localDescriptor.toString() <<
649 ", resettings connections that end at old locator " <<
650 oldLocalDescriptor.toString());
651
652 LinkSet::iterator i = linkSet.begin();
653 LinkSet::iterator iend = linkSet.end();
654
655 for( ; i != iend; i++ ){
656
657 logging_debug( "checking connection for locator change: " <<
658 " local " << (*i).localLocator->toString() <<
659 " old " << oldLocalDescriptor.locator->toString() );
660
661 if( *((*i).localLocator) == *(oldLocalDescriptor.locator) ){
662
663 logging_debug("terminating connection to " << (*i).remoteLocator->toString() );
664 transport->terminate( oldLocalDescriptor.locator, (*i).remoteLocator );
665
666 (*i).localLocator = localDescriptor.locator;
667 }
668 } // for( ; i != iend; i++ )
669
670 // wait 500ms to give the sockets time to shut down
671 usleep( 500000 );
672
673 } else {
674
675 logging_debug( "locator has not changed, not resetting connections" );
676
677 }
678
679 //
680 // handle the connections that have no longer any
681 // valid locator. send update messages with the new
682 // locator, so the remote node updates its locator/link mapping
683 //
684
685 LinkSet::iterator iAffected = linkSet.begin();
686 LinkSet::iterator endAffected = linkSet.end();
687
688 for( ; iAffected != endAffected; iAffected++ ){
689 LinkDescriptor descr = *iAffected;
690 logging_debug( "sending out link locator update to " << descr.remoteLocator->toString() );
691
692 AribaBaseMsg updateMsg( descr.remoteLocator,
693 AribaBaseMsg::LINK_STATE_UPDATE,
694 descr.localLink, descr.remoteLink );
695
696 transport->sendMessage( &updateMsg );
697 }
698}
699
700}} // namespace ariba, communication
Note: See TracBrowser for help on using the repository browser.