An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/communication/BaseCommunication.cpp @ 12060

Last change on this file since 12060 was 12060, checked in by hock@…, 9 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue? can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery? discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg?)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers? (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 29.4 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseCommunication.h"
40
41#include "networkinfo/AddressDiscovery.h"
42#include "ariba/utility/types/PeerID.h"
43#include "ariba/utility/system/SystemQueue.h"
44#include <boost/function.hpp>
45
46#ifdef UNDERLAY_OMNET
47  #include "ariba/communication/modules/transport/omnet/AribaOmnetModule.h"
48  #include "ariba/communication/modules/network/omnet/OmnetNetworkProtocol.h"
49  #include "ariba/utility/system/StartupWrapper.h"
50
51  using ariba::communication::AribaOmnetModule;
52  using ariba::communication::OmnetNetworkProtocol;
53  using ariba::utility::StartupWrapper;
54#endif
55
56namespace ariba {
57namespace communication {
58
59using namespace ariba::addressing2;
60
61using ariba::utility::PeerID;
62using ariba::utility::SystemQueue;
63
64use_logging_cpp(BaseCommunication);
65
66
67BaseCommunication::BaseCommunication() 
68        : currentSeqnum( 0 ),
69          transport( NULL ),
70          messageReceiver( NULL ),
71          started( false ),
72          listenOn_endpoints(new addressing2::endpoint_set())
73{
74}
75
76
77BaseCommunication::~BaseCommunication(){
78}
79
80
81void BaseCommunication::start(EndpointSetPtr listen_on) {
82    assert ( ! started );
83   
84    listenOn_endpoints = listen_on;
85    logging_info("Setting local end-points: " << listenOn_endpoints->to_string());
86   
87        logging_info( "Starting up ..." );
88        currentSeqnum = 0;
89
90        // creating transports
91        //  ---> transport_peer holds the set of the active endpoints we're listening on
92        logging_info( "Creating transports ..." );
93        transport = new transport_peer();
94        active_listenOn_endpoints = transport->add_listenOn_endpoints(listenOn_endpoints);
95        logging_info( "XXX. Active endpoints = " << active_listenOn_endpoints->to_string() ); // XXX
96
97        logging_info( "Searching for local locators ..." );
98        local_endpoints = AddressDiscovery::discover_endpoints(active_listenOn_endpoints);
99        if ( local_endpoints->count() > 0 )
100        {
101            logging_info( "Done. Discovered local endpoints: " << local_endpoints->to_string() );
102        }
103        else
104        {
105            logging_warn("WARING!! No local endpoints found, NO COMMUNICATION POSSIBLE!!");
106           
107            // TODO notify application, so that it may react properly. throw exception..?
108            assert( false );
109        }
110
111       
112    // create local EndpointDescriptor
113        //  ---> localDescriptor hold the set endpoints that can be used to reach us
114    localDescriptor.getPeerId() = PeerID::random();
115    localDescriptor.replace_endpoint_set(local_endpoints);
116    logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
117
118    // start transport_peer
119        transport->register_listener( this );
120        transport->start();
121
122        // bind to the network change detection
123        networkMonitor.registerNotification( this );
124
125        // base comm startup done
126        started = true;
127        logging_info( "Started up." );
128}
129
130void BaseCommunication::stop() {
131        logging_info( "Stopping transports ..." );
132
133        transport->stop();
134        delete transport;
135        started = false;
136
137        logging_info( "Stopped." );
138}
139
140bool BaseCommunication::isStarted(){
141        return started;
142}
143
144const LinkID BaseCommunication::establishLink(
145        const EndpointDescriptor& descriptor,
146        const LinkID& link_id,
147        const QoSParameterSet& qos,
148        const SecurityParameterSet& sec) {
149
150        // copy link id
151        LinkID linkid = link_id;
152
153        // debug
154        logging_debug( "Request to establish link" );
155
156        // create link identifier
157        if (linkid.isUnspecified())     linkid = LinkID::create();
158
159        // create link descriptor
160        logging_debug( "Creating new descriptor entry with local link id=" << linkid.toString() );
161        LinkDescriptor* ld = new LinkDescriptor();
162        ld->localLink = linkid;
163        addLink( ld );
164
165
166    /* send a message to request new link to remote */
167    logging_debug( "Send messages with request to open link to " << descriptor.toString() );
168
169    /*
170         * Create Link-Request Message:
171         * NOTE: - Their PeerID (in parent message)
172         * - Our LinkID
173         * - Our PeerID
174         * - Our EndpointDescriptor
175         */
176        reboost::message_t linkmsg;
177        linkmsg.push_back(linkid.serialize());
178        linkmsg.push_back(localDescriptor.getPeerId().serialize());
179        linkmsg.push_back(localDescriptor.endpoints->serialize());
180       
181//      // XXX AKTUELL BUG FINDING...
182//      reboost::shared_buffer_t xxx = localDescriptor.endpoints->serialize();
183//      EndpointSetPtr xxx_set = endpoint_set::create_EndpointSet();
184//      xxx_set->deserialize(xxx);
185//      cout << "/// MARIO VORHER: " << localDescriptor.endpoints->to_string() << endl;
186//      cout << "/// MARIO NACHHER: " << xxx_set->to_string() << endl;
187       
188        // send message
189        // TODO move enum to BaseComm
190        send_to_peer(AribaBaseMsg::typeLinkRequest, descriptor.getPeerId(), linkmsg,
191                descriptor, system_priority::OVERLAY);
192
193        return linkid;
194}
195
196void BaseCommunication::dropLink(const LinkID link) {
197
198        logging_debug( "Starting to drop link " + link.toString() );
199
200        // see if we have the link
201        LinkDescriptor& ld = queryLocalLink( link );
202        if( ld.isUnspecified() ) {
203                logging_error( "Don't know the link you want to drop "+ link.toString() );
204                return;
205        }
206
207        // tell the registered listeners
208        foreach( CommunicationEvents* i, eventListener ) {
209                i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
210        }
211
212
213        // * send message to drop the link *
214        logging_debug( "Sending out link close request. for us, the link is closed now" );
215        reboost::message_t empty_message;
216        send_over_link( AribaBaseMsg::typeLinkClose, empty_message, ld, system_priority::OVERLAY );
217
218        // remove from map
219        removeLink(link);
220}
221
222
223seqnum_t BaseCommunication::sendMessage( const LinkID& lid,
224        reboost::message_t message,
225        uint8_t priority,
226        bool bypass_overlay) throw(communication_message_not_sent)
227{
228    // message type: direct data or (normal) data
229    AribaBaseMsg::type_ type;
230    if ( bypass_overlay )
231    {
232        type = AribaBaseMsg::typeDirectData;
233        logging_debug( "Sending out direct-message to link " << lid.toString() );
234    }
235    else
236    {
237        type = AribaBaseMsg::typeData;
238        logging_debug( "Sending out message to link " << lid.toString() );
239    }
240
241   
242        // query local link info
243        LinkDescriptor& ld = queryLocalLink(lid);
244        if( ld.isUnspecified() )
245        {
246            throw communication_message_not_sent("Don't know the link with id "
247                    + lid.toString());
248        }
249
250        // link not up-> error
251        if( !ld.up )
252        {
253            throw communication_message_not_sent("Can not send on link "
254                    + lid.toString() + ": link not up");
255        }
256
257
258        // * send message *
259        bool okay = send_over_link( type, message, ld, priority );
260
261        if ( ! okay )
262        {
263            throw communication_message_not_sent("send_over_link failed!");
264        }
265       
266        return ++currentSeqnum;
267}
268
269const EndpointDescriptor& BaseCommunication::getEndpointDescriptor(const LinkID link) const {
270        if( link.isUnspecified() ){
271                return localDescriptor;
272        } else {
273                LinkDescriptor& linkDesc = queryLocalLink(link);
274                if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
275                return linkDesc.remoteDescriptor;
276        }
277}
278
279void BaseCommunication::registerEventListener(CommunicationEvents* _events){
280        if( eventListener.find( _events ) == eventListener.end() )
281                eventListener.insert( _events );
282}
283
284void BaseCommunication::unregisterEventListener(CommunicationEvents* _events){
285        EventListenerSet::iterator i = eventListener.find( _events );
286        if( i != eventListener.end() )
287                eventListener.erase( i );
288}
289
290
291
292/*------------------------------
293 | ASIO thread --> SystemQueue |
294 ------------------------------*/
295
296/// ASIO thread
297void BaseCommunication::receive_message(transport_connection::sptr connection,
298        reboost::shared_buffer_t msg) {
299
300        logging_debug( "Dispatching message" );
301       
302    SystemQueue::instance().scheduleCall(
303            boost::bind(
304                    &BaseCommunication::receiveMessage,
305                    this,
306                    connection,
307                    msg)
308        );
309}
310
311/// ASIO thread
312void BaseCommunication::connection_terminated(transport_connection::sptr connection)
313{
314    SystemQueue::instance().scheduleCall(
315            boost::bind(
316                    &BaseCommunication::connectionTerminated,
317                    this,
318                    connection)
319        );
320}
321
322/*--------------------------------
323 | [ASIO thread --> SystemQueue] |
324 -------------------------------*/
325
326/// ARIBA thread (System Queue)
327void BaseCommunication::connectionTerminated(transport_connection::sptr connection)
328{
329    vector<LinkID*> links = connection->get_communication_links();
330   
331    logging_debug("[BaseCommunication] Connection terminated: "
332            << connection->getLocalEndpoint()->to_string()
333            << " <--> " << connection->getRemoteEndpoint()->to_string()
334            << " (" << links.size() << " links)");
335   
336    // remove all links that used the terminated connection
337    for ( vector<LinkID*>::iterator it = links.begin(); it != links.end(); ++it )
338    {
339        LinkID& link_id = **it;
340       
341        logging_debug("  ---> Removing link: " << link_id.toString());
342       
343        // searching for link, not found-> warn
344        LinkDescriptor& linkDesc = queryLocalLink( link_id );
345        if (linkDesc.isUnspecified()) {
346            logging_warn("Failed to find local link " << link_id.toString());
347            continue;
348        }
349
350        // inform listeners
351        foreach( CommunicationEvents* i, eventListener ){
352            i->onLinkFail( linkDesc.localLink,
353                    linkDesc.localLocator, linkDesc.remoteLocator );
354        }
355
356        // remove the link descriptor
357        removeLink( link_id );
358    }
359}
360
361/// ARIBA thread (System Queue)
362void BaseCommunication::receiveMessage(transport_connection::sptr connection,
363        reboost::shared_buffer_t message)
364{
365    // XXX
366    logging_debug("/// [receiveMessage] buffersize: " << message.size());
367       
368    // get type
369    uint8_t type = message.data()[0];
370    reboost::shared_buffer_t sub_buff = message(1);
371   
372    // get link id
373    LinkID link_id;
374    if ( type != AribaBaseMsg::typeLinkRequest)
375    {
376        sub_buff = link_id.deserialize(sub_buff);
377    }
378   
379        // handle message
380        switch ( type )
381        {
382                // ---------------------------------------------------------------------
383                // data message
384                // ---------------------------------------------------------------------
385                case AribaBaseMsg::typeData:
386                {
387                        logging_debug( "Received data message, forwarding to overlay." );
388                        if( messageReceiver != NULL )
389                        {
390                                messageReceiver->receiveMessage(
391                                        sub_buff, link_id, NodeID::UNSPECIFIED, false
392                                );
393                        }
394                       
395                        break;
396                }
397
398        // ---------------------------------------------------------------------
399        // direct data message (bypass overlay-layer)
400        // ---------------------------------------------------------------------
401        case AribaBaseMsg::typeDirectData:
402        {
403            logging_debug( "Received direct data message, forwarding to application." );
404           
405            if( messageReceiver != NULL )
406            {
407                messageReceiver->receiveMessage(
408                    sub_buff, link_id, NodeID::UNSPECIFIED, true
409                );
410            }
411           
412            break;
413        }
414
415
416       
417                // ---------------------------------------------------------------------
418                // handle link request from remote
419                // ---------------------------------------------------------------------
420                case AribaBaseMsg::typeLinkRequest:
421                {
422                        logging_debug( "Received link open request on "
423                                << connection->getLocalEndpoint()->to_string() );
424                       
425                        /*
426                         * Deserialize Peer Message
427                         * - Our PeerID
428                         */
429                        PeerID our_peer_id;
430                        sub_buff = our_peer_id.deserialize(sub_buff);
431
432            /// not the correct peer id-> skip request
433            if ( our_peer_id != localDescriptor.getPeerId() &&
434                    ! our_peer_id.isUnspecified() /* overlay bootstrap */ )
435            {
436                logging_info("Received link request for "
437                    << our_peer_id.toString()
438                    << "but i'm "
439                    << localDescriptor.getPeerId()
440                    << ": Ignoring!");
441               
442                // TODO terminate connection?
443               
444                break;
445            }
446
447           
448                    /*
449                     * Deserialize Link-Request Message:
450                     * - Their LinkID
451                     * - Their PeerID
452                     * - Their EndpointDescriptor
453                     */
454            LinkID their_link_id;
455            PeerID their_peer_id;
456            EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet();
457            sub_buff = their_link_id.deserialize(sub_buff);
458            sub_buff = their_peer_id.deserialize(sub_buff);
459            sub_buff = their_endpoints->deserialize(sub_buff);
460            /* [ Deserialize Link-Request Message ] */
461
462           
463                        /// only answer the first request
464                        if (!queryRemoteLink(their_link_id).isUnspecified())
465                        {
466                           
467                            // TODO aktuell: When will these connections be closed?
468                            // ---> Close it now (if it services no links) ?
469                            //   (see also ! allowlink below)
470                           
471                            // XXX AKTUELL TESTING !! This will cause race conditions. So this is test-code only!
472                            if ( connection->get_communication_links().size() == 0 )
473                            {
474                                connection->terminate();
475                            }
476                           
477                                logging_debug("Link request already received. Ignore!");
478                                break;
479                        }
480
481                        /// create link ids
482                        LinkID localLink  = LinkID::create();
483                        LinkID remoteLink = their_link_id;  // XXX intermediate variable is unnecessary
484                        logging_debug(
485                                "local=" << connection->getLocalEndpoint()->to_string()
486                                << " remote=" << connection->getRemoteEndpoint()->to_string()
487                        );
488
489                        // check if link creation is allowed by ALL listeners
490                        bool allowlink = true;
491                        foreach( CommunicationEvents* i, eventListener ){
492                                allowlink &= i->onLinkRequest( localLink,
493                                        connection->getLocalEndpoint(),
494                                        connection->getRemoteEndpoint());
495                        }
496
497                        // not allowed-> warn
498                        if( !allowlink ){
499                                logging_warn( "Overlay denied creation of link" );
500                                return;
501                        }
502
503                        // create descriptor
504                        LinkDescriptor* ld = new LinkDescriptor();
505                        ld->localLink = localLink;
506                        ld->remoteLink = remoteLink;
507                        ld->localLocator = connection->getLocalEndpoint();
508                        ld->remoteLocator = connection->getRemoteEndpoint();
509                        ld->remoteDescriptor = EndpointDescriptor(their_peer_id, their_endpoints);
510                        ld->set_connection(connection);
511
512           
513                        // update endpoints (should only have any effect in case of NAT)
514                        ld->remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint());
515//                      localDescriptor.endpoints->add_endpoint(connection->getLocalEndpoint());  // XXX 0.0.0.0:0
516
517                        // link is now up-> add it
518                        ld->up = true;
519                        addLink(ld);
520
521
522                       
523            /* sending link reply */
524            logging_debug( "Sending link reply with ids "
525                << "local=" << localLink.toString() << ", "
526                << "remote=" << remoteLink.toString() );
527
528                    /*
529                     * Create Link-Reply Message:
530                     * - Our LinkID
531                     * - Our Endpoint_Set (as update)
532                     * - Their EndpointDescriptor (maybe they learn something about NAT)
533                     */
534                    reboost::message_t linkmsg;
535                    linkmsg.push_back(localLink.serialize());
536                    linkmsg.push_back(localDescriptor.endpoints->serialize());
537                    linkmsg.push_back(ld->remoteDescriptor.endpoints->serialize());
538                   
539                    // XXX
540                    cout << "/// MARIO: " << ld->get_connection()->getRemoteEndpoint()->to_string() << endl;
541                   
542                    // send message
543                        bool sent = send_over_link( AribaBaseMsg::typeLinkReply, linkmsg, *ld, system_priority::OVERLAY );
544                       
545                        if ( ! sent )
546                        {
547                            logging_error("ERROR: Could not send LinkReply to: " << ld->remoteLocator->to_string());
548                           
549                            // TODO remove link, close link, ..?
550                           
551                            break;
552                        }
553
554
555            // link is up!
556            logging_debug( "Link (initiated from remote) is up with "
557                << "local(id=" << ld->localLink.toString() << ","
558                << "locator=" << ld->localLocator->to_string() << ") "
559                << "remote(id=" << ld->remoteLink.toString() << ", "
560                << "locator=" << ld->remoteLocator->to_string() << ")"
561            );
562
563                        // inform listeners about new open link
564                        foreach( CommunicationEvents* i, eventListener ) {
565                                i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
566                        }
567
568                        // done
569                        break;
570                }
571
572                // ---------------------------------------------------------------------
573                // handle link request reply
574                // ---------------------------------------------------------------------
575                case AribaBaseMsg::typeLinkReply:
576                {
577                        logging_debug( "Received link open reply for a link we initiated" );
578
579            /*
580             * Deserialize Link-Reply Message:
581             * - Their LinkID
582             * - Their Endpoint_Set (as update)
583             * - Our EndpointDescriptor (maybe we can learn something about NAT)
584             */
585                        LinkID their_link_id;
586                        EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet();
587                        EndpointSetPtr our_endpoints = endpoint_set::create_EndpointSet();
588                        sub_buff = their_link_id.deserialize(sub_buff);
589                        sub_buff = their_endpoints->deserialize(sub_buff);
590                        sub_buff = our_endpoints->deserialize(sub_buff);
591
592           
593                        // this is a reply to a link open request, so we have already
594                        // a link mapping and can now set the remote link to valid
595                        LinkDescriptor& ld = queryLocalLink( link_id );
596
597                        // no link found-> warn!
598                        if (ld.isUnspecified()) {
599                                logging_warn("Failed to find local link " << link_id.toString());
600                                return;
601                        }
602                       
603                        if ( ld.up )
604                        {
605                logging_warn("Got link replay for already open link. Ignore. LinkID: " << link_id.toString());
606               
607                // TODO send LinkClose ?
608                return;                     
609                        }
610
611                        // store the connection
612                        ld.set_connection(connection);
613                       
614                        // set remote locator and link id
615                        ld.remoteLink = their_link_id;
616                        ld.remoteLocator = connection->getRemoteEndpoint();
617                       
618                       
619                        /* Update endpoints */
620                        // NOTE: we might loose some information here, but it's our only chance to get rid of outdated information.
621                        ld.remoteDescriptor.replace_endpoint_set(their_endpoints);
622                       
623                        // add actual remote endpoint to this set (should only have any effect in case of NAT)
624                        ld.remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint());
625                       
626                        // TODO In case of NAT, we could learn something about our external IP.
627                        //   ---> But we must trust the remote peer about this information!! 
628//                      localDescriptor.endpoints->add_endpoints(our_endpoints);
629                       
630                       
631                       
632                       
633                        ld.up = true;
634
635                        logging_debug( "Link is now up with local id "
636                                << ld.localLink.toString() << " and remote id "
637                                << ld.remoteLink.toString() );
638
639
640                        // inform lisneters about link up event
641                        foreach( CommunicationEvents* i, eventListener ){
642                                i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
643                        }
644
645                        // done
646                        break;
647                }
648
649                // ---------------------------------------------------------------------
650                // handle link close requests
651                // ---------------------------------------------------------------------
652                case AribaBaseMsg::typeLinkClose: {
653                        // get remote link
654//                      const LinkID& localLink = msg.getRemoteLink();
655                        logging_debug( "Received link close request for link " << link_id.toString() );
656
657                        // searching for link, not found-> warn
658                        LinkDescriptor& linkDesc = queryLocalLink( link_id );
659                        if (linkDesc.isUnspecified()) {
660                                logging_warn("Failed to find local link " << link_id.toString());
661                                return;
662                        }
663
664                        // inform listeners
665                        foreach( CommunicationEvents* i, eventListener ){
666                                i->onLinkDown( linkDesc.localLink,
667                                                linkDesc.localLocator, linkDesc.remoteLocator );
668                        }
669
670                        // remove the link descriptor
671                        removeLink( link_id );
672
673                        // done
674                        break;
675                }
676
677                // ---------------------------------------------------------------------
678                // handle link locator changes  -- TODO is this ever called..?
679                // ---------------------------------------------------------------------
680//              case AribaBaseMsg::typeLinkUpdate: {
681//                      const LinkID& localLink = msg.getRemoteLink();
682//                      logging_debug( "Received link update for link "
683//                              << localLink.toString() );
684//
685//                      // find the link description
686//                      LinkDescriptor& linkDesc = queryLocalLink( localLink );
687//                      if (linkDesc.isUnspecified()) {
688//                              logging_warn("Failed to update local link "
689//                                      << localLink.toString());
690//                              return;
691//                      }
692//
693//                      // update the remote locator
694//                      addressing2::EndpointPtr oldremote = linkDesc.remoteLocator;
695//                      linkDesc.remoteLocator = connection->getRemoteEndpoint();
696//                     
697//                      // TODO update linkDesc.connection ?
698//
699//                      // inform the listeners (local link has _not_ changed!)
700//                      foreach( CommunicationEvents* i, eventListener ){
701//                              i->onLinkChanged(
702//                                      linkDesc.localLink,     // linkid
703//                                      linkDesc.localLocator,  // old local
704//                                      linkDesc.localLocator,  // new local
705//                                      oldremote,                      // old remote
706//                                      linkDesc.remoteLocator  // new remote
707//                              );
708//                      }
709//
710//                      // done
711//                      break;
712//              }
713               
714               
715        default: {
716            logging_warn( "Received unknown message type!" );
717            break;
718        }
719
720        }
721}
722
723/// add a newly allocated link to the set of links
724void BaseCommunication::addLink( LinkDescriptor* link ) {
725        linkSet.push_back( link );
726}
727
728/// remove a link from set
729void BaseCommunication::removeLink( const LinkID& localLink ) {
730        for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
731                if( (*i)->localLink != localLink) continue;
732//              remove_endpoint((*i)->remoteLocator);  // XXX
733                delete *i;
734                linkSet.erase( i );
735                break;
736        }
737}
738
739/// query a descriptor by local link id
740BaseCommunication::LinkDescriptor& BaseCommunication::queryLocalLink( const LinkID& link ) const {
741        for (size_t i=0; i<linkSet.size();i++)
742                if (linkSet[i]->localLink == link) return (LinkDescriptor&)*linkSet[i];
743
744        return LinkDescriptor::UNSPECIFIED();
745}
746
747/// query a descriptor by remote link id
748BaseCommunication::LinkDescriptor& BaseCommunication::queryRemoteLink( const LinkID& link ) const {
749        for (size_t i=0; i<linkSet.size();i++)
750                if (linkSet[i]->remoteLink == link) return (LinkDescriptor&)*linkSet[i];
751
752        return LinkDescriptor::UNSPECIFIED();
753}
754
755//LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
756//      LinkIDs ids;
757//      for (size_t i=0; i<linkSet.size(); i++){
758//              if( addr == NULL ){
759//                      ids.push_back( linkSet[i]->localLink );
760//              } else {
761//                      if ( *linkSet[i]->remoteLocator == *addr )
762//                              ids.push_back( linkSet[i]->localLink );
763//              }
764//      }
765//      return ids;
766//}
767
768void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
769
770/*- disabled!
771
772        // we only care about address changes, not about interface changes
773        // as address changes are triggered by interface changes, we are safe here
774        if( info.type != NetworkChangeInterface::EventTypeAddressNew &&
775                info.type != NetworkChangeInterface::EventTypeAddressDelete ) return;
776
777        logging_info( "base communication is handling network address changes" );
778
779        // get all now available addresses
780        NetworkInformation networkInformation;
781        AddressInformation addressInformation;
782
783        NetworkInterfaceList interfaces = networkInformation.getInterfaces();
784        AddressList addresses;
785
786        for( NetworkInterfaceList::iterator i = interfaces.begin(); i != interfaces.end(); i++ ){
787                AddressList newaddr = addressInformation.getAddresses(*i);
788                addresses.insert( addresses.end(), newaddr.begin(), newaddr.end() );
789        }
790
791        //
792        // get current locators for the local endpoint
793        // TODO: this code is dublicate of the ctor code!!! cleanup!
794        //
795
796        NetworkProtocol::NetworkLocatorSet locators = network->getAddresses();
797        NetworkProtocol::NetworkLocatorSet::iterator i = locators.begin();
798        NetworkProtocol::NetworkLocatorSet::iterator iend = locators.end();
799
800        //
801        // remember the old local endpoint, in case it changes
802        //
803
804        EndpointDescriptor oldLocalDescriptor( localDescriptor );
805
806        //
807        // look for local locators that we can use in communication
808        //
809        // choose the first locator that is not localhost
810        //
811
812        bool foundLocator = false;
813        bool changedLocator = false;
814
815        for( ; i != iend; i++){
816                logging_debug( "local locator found " << (*i)->toString() );
817                IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
818
819                if( *ipv4locator != IPv4Locator::LOCALHOST &&
820                    *ipv4locator != IPv4Locator::ANY       &&
821                    *ipv4locator != IPv4Locator::BROADCAST  ){
822
823                        ipv4locator->setPort( listenport );
824                        changedLocator = *localDescriptor.locator != *ipv4locator;
825                        localDescriptor.locator = ipv4locator;
826                        logging_info( "binding to addr = " << ipv4locator->toString() );
827                        foundLocator = true;
828                        break;
829                }
830        } // for( ; i != iend; i++)
831
832        //
833        // if we found no locator, bind to localhost
834        //
835
836        if( !foundLocator ){
837                changedLocator = *localDescriptor.locator != IPv4Locator::LOCALHOST;
838                localDescriptor.locator = new IPv4Locator( IPv4Locator::LOCALHOST );
839                ((IPv4Locator*)(localDescriptor.locator))->setPort( listenport );
840                logging_info( "found no good local lcoator, binding to addr = " <<
841                                                localDescriptor.locator->toString() );
842        }
843
844        //
845        // if we have connections that have no more longer endpoints
846        // close these. they will be automatically built up again.
847        // also update the local locator in the linkset mapping
848        //
849
850        if( changedLocator ){
851
852                logging_debug( "local endp locator has changed to " << localDescriptor.toString() <<
853                                ", resettings connections that end at old locator " <<
854                                        oldLocalDescriptor.toString());
855
856                LinkSet::iterator i = linkSet.begin();
857                LinkSet::iterator iend = linkSet.end();
858
859                for( ; i != iend; i++ ){
860
861                        logging_debug( "checking connection for locator change: " <<
862                                        " local " << (*i).localLocator->toString() <<
863                                        " old " << oldLocalDescriptor.locator->toString() );
864
865                        if( *((*i).localLocator) == *(oldLocalDescriptor.locator) ){
866
867                                logging_debug("terminating connection to " << (*i).remoteLocator->toString() );
868                                transport->terminate( oldLocalDescriptor.locator, (*i).remoteLocator );
869
870                                (*i).localLocator = localDescriptor.locator;
871                        }
872                } // for( ; i != iend; i++ )
873
874                // wait 500ms to give the sockets time to shut down
875                usleep( 500000 );
876
877        } else {
878
879                logging_debug( "locator has not changed, not resetting connections" );
880
881        }
882
883        //
884        // handle the connections that have no longer any
885        // valid locator. send update messages with the new
886        // locator,  so the remote node updates its locator/link mapping
887        //
888
889        LinkSet::iterator iAffected = linkSet.begin();
890        LinkSet::iterator endAffected = linkSet.end();
891
892        for( ; iAffected != endAffected; iAffected++ ){
893                LinkDescriptor descr = *iAffected;
894                logging_debug( "sending out link locator update to " << descr.remoteLocator->toString() );
895
896                AribaBaseMsg updateMsg(         descr.remoteLocator,
897                                                AribaBaseMsg::LINK_STATE_UPDATE,
898                                                descr.localLink, descr.remoteLink );
899
900                transport->sendMessage( &updateMsg );
901        }
902*/
903}
904
905
906addressing2::EndpointPtr BaseCommunication::get_local_endpoint_of_link(
907        const LinkID& linkid)
908{
909    LinkDescriptor& ld = queryLocalLink(linkid);
910   
911    return ld.get_connection()->getLocalEndpoint();
912}
913
914addressing2::EndpointPtr BaseCommunication::get_remote_endpoint_of_link(
915        const LinkID& linkid)
916{
917    LinkDescriptor& ld = queryLocalLink(linkid);
918   
919    return ld.get_connection()->getRemoteEndpoint();
920}
921
922
923
924bool BaseCommunication::send_over_link(
925        const uint8_t type,
926        reboost::message_t message,
927        const LinkDescriptor& desc,
928        const uint8_t priority)
929{
930    /*
931     * Create Link Message:
932     * - Type
933     * - Their LinkID
934     */
935    // link id
936    message.push_front(desc.remoteLink.serialize());
937    // type
938    memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t));
939    /* [ Create Link Message ] */
940   
941   
942    /* send message */
943    transport_connection::sptr conn = desc.get_connection();
944    if ( ! conn )
945    {
946        cout << "/// MARIO: No connection!!" << endl;  // XXX debug
947        return false;
948    }
949   
950    // * send over connection *
951    return conn->send(message, priority);
952}
953
954void BaseCommunication::send_to_peer(
955        const uint8_t type,
956        const PeerID& peer_id,
957        reboost::message_t message,
958        const EndpointDescriptor& endpoint,
959        const uint8_t priority )
960{
961    /*
962     * Create Peer Message:
963     * - Type
964     * - Their PeerID
965     */
966    // peer id
967    message.push_front(peer_id.serialize());
968    // type
969    memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t));
970   
971   
972    /* send message */
973    transport->send(endpoint.getEndpoints(), message, priority);
974}
975
976
977
978
979}} // namespace ariba, communication
Note: See TracBrowser for help on using the repository browser.