Changeset 2483


Ignore:
Timestamp:
Feb 24, 2009, 10:06:43 PM (16 years ago)
Author:
Christoph Mayer
Message:

-autolinks impl. (funktioniert noch nicht komplett, macht aber im moment nichts schlechter)
-einige fixes im ablauf etc.

Files:
8 edited

Legend:

Unmodified
Added
Removed
  • etc/pingpong/settings_initiator.cnf

    r2454 r2483  
    44ariba.tcp.port = 5002
    55#ariba.udp.port =
    6 ariba.bootstrap.hints=pingpong{ip{127.0.0.1},tcp(ip,{5002})}
     6#ariba.bootstrap.hints=pingpong{ip{127.0.0.1},tcp(ip,{5002})}
  • etc/pingpong/settings_node2.cnf

    r2454 r2483  
    22node.initiator = false
    33#ariba.ip.addr =
    4 ariba.tcp.port = 5010
     4ariba.tcp.port = 5004
    55#ariba.udp.port =
    66ariba.bootstrap.hints=pingpong{ip{127.0.0.1},tcp(ip,{5002})}
  • sample/pingpong/PingPong.cpp

    r2473 r2483  
    1717// construction
    1818PingPong::PingPong() : pingId( 0 ) {
     19        Timer::setInterval( 5000 );
    1920}
    2021
     
    7071
    7172        // bind communication and node listener
    72         node->bind(this);
    73         node->bind(this, PingPong::PINGPONG_ID);
     73        node->bind( this );
     74        node->bind( this, PingPong::PINGPONG_ID);
    7475       
     76        // start the ping timer. if we are not
     77        // the initiator this will happen in onJoinCompleted
     78        if( isInitiator ) Timer::start();
     79
    7580        // ping pong started up...
    7681        logging_info( "pingpong started up ");
     
    8590        Timer::stop();
    8691
    87         // unbind listeners
     92        // unbind communication and node listener
    8893        node->unbind( this );
    8994        node->unbind( this, PingPong::PINGPONG_ID );
     
    108113
    109114        // start the timer to ping every second
    110         Timer::setInterval( 1000 );
    111115        Timer::start();
    112116}
     
    117121
    118122// communication listener
    119 bool PingPong::onLinkRequest(const NodeID& remote, Message* msg) {
     123bool PingPong::onLinkRequest(const NodeID& remote, const DataMessage& msg) {
    120124        return false;
    121125}
    122126
    123 void PingPong::onMessage(Message* msg, const NodeID& remote,
    124                 const LinkID& lnk = LinkID::UNSPECIFIED) {
     127void PingPong::onMessage(const DataMessage& msg, const NodeID& remote, const LinkID& lnk) {
    125128
    126         PingPongMessage* pingmsg = msg->decapsulate<PingPongMessage> ();
     129        PingPongMessage* pingmsg = msg.getMessage()->decapsulate<PingPongMessage> ();
    127130
    128131        logging_info( "received ping message on link " << lnk.toString()
    129132                        << " from node " << remote.toString()
    130                         << ": " << pingmsg->toString() );
     133                        << ": " << pingmsg->info() );
    131134}
    132135
     
    142145        PingPongMessage pingmsg( pingId );
    143146        node->sendBroadcastMessage( pingmsg, PingPong::PINGPONG_ID );
    144 
    145147}
    146148
  • sample/pingpong/PingPong.h

    r2473 r2483  
    3535protected:
    3636        // communication listener interface
    37         virtual bool onLinkRequest(const NodeID& remote, Message* msg);
    38         virtual void onMessage(Message* msg, const NodeID& remote, const LinkID& lnk);
     37        virtual bool onLinkRequest(const NodeID& remote, const DataMessage& msg);
     38        virtual void onMessage(const DataMessage& msg, const NodeID& remote, const LinkID& lnk= LinkID::UNSPECIFIED);
    3939
    4040        // node listener interface
     
    6262        // the current ping id
    6363        unsigned long pingId;
     64
    6465};
    6566
  • source/ariba/communication/BaseCommunication.cpp

    r2482 r2483  
    8585
    8686        for( ; i != iend; i++){
    87                 logging_debug( "local locator found " + (*i)->toString() );
     87                logging_debug( "local locator found " << (*i)->toString() );
    8888                IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
    8989
     
    101101                        localDescriptor.locator = ipv4locator;
    102102                        localDescriptor.isUnspec = false;
    103                         logging_info( "binding to addr = " + ipv4locator->toString() );
     103                        logging_info( "binding to addr = " << ipv4locator->toString() );
    104104                        foundLocator = true;
    105105                        break;
     
    180180        LinkID linkid = LinkID::create();
    181181
    182         logging_debug( "creating new local descriptor entry with local link id " + linkid.toString() );
    183         LinkDescriptor linkDescriptor( linkid, local, LinkID::UNSPECIFIED, remote, descriptor );
     182        logging_debug( "creating new local descriptor entry with local link id " << linkid.toString() );
     183        LinkDescriptor linkDescriptor( linkid, local, LinkID::UNSPECIFIED, remote, descriptor, false );
    184184        addLink( linkDescriptor );
    185185
     
    189189        //
    190190
    191         logging_debug( "sending out base messages with request to open link to " + remote->toString() );
     191        logging_debug( "sending out base messages with request to open link to " << remote->toString() );
    192192        AribaBaseMsg baseMsg( remote, AribaBaseMsg::LINK_STATE_OPEN_REQUEST, linkid,
    193193                                                                LinkID::UNSPECIFIED );
     
    206206                logging_error( "don't know the link you want to drop" );
    207207                return;
     208        }
     209
     210        // warn if this link has some queued messages attached
     211        if( descriptor.waitingmsg.size() > 0 ){
     212                logging_warn( "dropping link " << link.toString() <<
     213                        " that has " << descriptor.waitingmsg.size() << " waiting messages" );
    208214        }
    209215
     
    231237seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
    232238
    233         logging_debug( "sending out message to link " + lid.toString() );
     239        logging_debug( "sending out message to link " << lid.toString() );
    234240
    235241        // query local link info
    236242        LinkDescriptor& linkDesc = queryLocalLink(lid);
    237243        if( linkDesc.isUnspecified() ){
    238                 logging_error( "don't know the link with id " + lid.toString() );
    239                 return 0;
     244                logging_error( "don't know the link with id " << lid.toString() );
     245                return -1;
    240246        }
    241247
     
    251257        msg.encapsulate( const_cast<Message*>(message) );
    252258
    253         // send message
    254         transport->sendMessage( &msg );
    255 
    256         return ++currentSeqnum;
     259        if( linkDesc.linkup ){
     260
     261                // send message
     262                transport->sendMessage( &msg );
     263                return ++currentSeqnum;
     264
     265        } else {
     266
     267                // queue message
     268                logging_info( "link " << lid.toString() << " is not up yet, queueing message" );
     269                linkDesc.waitingmsg.push_back( new Message(msg) ); // TODO ooooo
     270
     271                return 0;
     272
     273        } // if( linkDesc.linkup )
    257274}
    258275
     
    300317
    301318        AribaBaseMsg* spovmsg = ((Message*)message)->decapsulate<AribaBaseMsg>();
    302         logging_debug( "receiving base comm message of type " + spovmsg->getTypeString() );
     319        logging_debug( "receiving base comm message of type " << spovmsg->getTypeString() );
    303320
    304321        //
     
    351368                const NetworkLocator* remoteLocator = dynamic_cast<const NetworkLocator*>(message->getSourceAddress());
    352369
    353                 logging_debug( "localLocator=" + localLocator->toString() + " remoteLocator="+remoteLocator->toString());
     370                logging_debug( "localLocator=" << localLocator->toString()
     371                                << " remoteLocator=" << remoteLocator->toString());
    354372
    355373                // ask the registered listeners if this link
     
    372390
    373391                LinkDescriptor linkDescriptor(localLink, localLocator, remoteLink,
    374                                         remoteLocator, EndpointDescriptor(remoteLocator));
     392                                        remoteLocator, EndpointDescriptor(remoteLocator), true);
    375393
    376394                logging_debug( "saving new link descriptor with " <<
     
    378396                                "[local locator " << localLocator->toString() << "] " <<
    379397                                "[remote link " << remoteLink.toString() << "] " <<
    380                                 "[remote locator " << remoteLocator->toString() << "]" );
     398                                "[remote locator " << remoteLocator->toString() << "]" <<
     399                                "[link up true]" );
    381400
    382401                addLink( linkDescriptor );
     
    400419                // the link is now open
    401420                //
    402 
     421               
    403422                BOOST_FOREACH( CommunicationEvents* i, eventListener ){
    404423                        i->onLinkUp( localLink, localLocator, remoteLocator );
     
    420439
    421440                if (linkDesc.isUnspecified()) {
    422                         logging_warn("Failed to find local link "+spovmsg->getRemoteLink().toString());
     441                        logging_warn("Failed to find local link " << spovmsg->getRemoteLink().toString());
    423442                        return false;
    424443                }
    425444
    426445                linkDesc.remoteLink = spovmsg->getLocalLink();
    427                 logging_debug( "the link is now up with local link id " + spovmsg->getRemoteLink().toString() );
     446                linkDesc.linkup = true;
     447       
     448                logging_debug( "the link is now up with local link id " << spovmsg->getRemoteLink().toString() );
    428449
    429450                BOOST_FOREACH( CommunicationEvents* i, eventListener ){
     
    431452                }
    432453
     454                if( linkDesc.waitingmsg.size() > 0 ){
     455                        logging_info( "sending out queued messages on link " << linkDesc.localLink.toString() );
     456
     457                        BOOST_FOREACH( Message* msg, linkDesc.waitingmsg ){
     458                                sendMessage( linkDesc.localLink, msg );
     459                                delete msg;
     460                        }
     461
     462                        linkDesc.waitingmsg.clear();
     463                }
     464
    433465        } // LINK_STATE_OPEN_REPLY
    434466
     
    440472
    441473                const LinkID& localLink = spovmsg->getRemoteLink();
    442                 logging_debug( "received link close request for link " + localLink.toString() );
     474                logging_debug( "received link close request for link " << localLink.toString() );
    443475
    444476                //
     
    450482                LinkDescriptor& linkDesc = queryLocalLink( localLink );
    451483                if (linkDesc.isUnspecified()) {
    452                         logging_warn("Failed to find local link "+localLink.toString());
     484                        logging_warn("Failed to find local link " << localLink.toString());
    453485                        return false;
    454486                }
     
    469501
    470502                const LinkID& localLink = spovmsg->getRemoteLink();
    471                 logging_debug( "received link update for link " + localLink.toString() );
     503                logging_debug( "received link update for link " << localLink.toString() );
    472504
    473505                //
     
    477509                LinkDescriptor& linkDesc = queryLocalLink( localLink );
    478510                if (linkDesc.isUnspecified()) {
    479                         logging_warn("Failed to update local link "+localLink.toString());
     511                        logging_warn("Failed to update local link " << localLink.toString());
    480512                        return false;
    481513                }
     
    517549
    518550        for( ; i != iend; i++){
    519                 if( (*i).localLink == localLink){
    520                         linkSet.erase( i );
    521                         break;
    522                 }
     551                if( (*i).localLink != localLink) continue;
     552               
     553                BOOST_FOREACH( Message* msg, i->waitingmsg ){
     554                        delete msg;
     555                }
     556
     557                i->waitingmsg.clear();
     558                linkSet.erase( i );
     559
     560                break;
    523561        }
    524562}
     
    568606                info.type != NetworkChangeInterface::EventTypeAddressDelete ) return;
    569607
    570         logging_info("base communication is handling network address changes");
     608        logging_info( "base communication is handling network address changes" );
    571609
    572610        //
     
    610648
    611649        for( ; i != iend; i++){
    612                 logging_debug( "local locator found " + (*i)->toString() );
     650                logging_debug( "local locator found " << (*i)->toString() );
    613651                IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i);
    614652
  • source/ariba/communication/BaseCommunication.h

    r2482 r2483  
    4646#include <vector>
    4747#include <iostream>
     48#include <deque>
    4849#include <algorithm>
    4950#include <boost/foreach.hpp>
     
    7778using std::map;
    7879using std::vector;
     80using std::deque;
    7981using std::pair;
    8082using std::make_pair;
     
    222224                        remoteLink(),
    223225                        remoteLocator(NULL),
    224                         remoteEndpoint(EndpointDescriptor::UNSPECIFIED){
     226                        remoteEndpoint(EndpointDescriptor::UNSPECIFIED),
     227                        linkup(false) {
    225228                }
    226229
    227230                LinkDescriptor(const LinkID& _localLink, const NetworkLocator*& _localLocator,
    228231                                const LinkID& _remoteLink, const NetworkLocator*& _remoteLocator,
    229                                 const EndpointDescriptor& _remoteEndpoint ) :
     232                                const EndpointDescriptor& _remoteEndpoint, bool _linkup ) :
    230233                        localLink(_localLink),
    231234                        localLocator(_localLocator),
    232235                        remoteLink(_remoteLink),
    233236                        remoteLocator(_remoteLocator),
    234                         remoteEndpoint(_remoteEndpoint){
     237                        remoteEndpoint(_remoteEndpoint),
     238                        linkup(_linkup) {
    235239                }
    236240
     
    240244                        remoteLink(desc.remoteLink),
    241245                        remoteLocator(desc.remoteLocator),
    242                         remoteEndpoint(desc.remoteEndpoint){
     246                        remoteEndpoint(desc.remoteEndpoint),
     247                        linkup(desc.linkup) {
     248
     249                        BOOST_FOREACH( Message* msg, desc.waitingmsg ){
     250                                waitingmsg.push_back( msg );
     251                        }
    243252                }
    244253
     
    252261                const NetworkLocator*   remoteLocator;
    253262                EndpointDescriptor      remoteEndpoint;
     263
     264                bool                    linkup;
     265                deque<Message*>         waitingmsg;
    254266        };
    255267
  • source/ariba/overlay/BaseOverlay.cpp

    r2473 r2483  
    6969//              ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_W);
    7070//      }
     71
     72        // timer for auto link management
     73        Timer::setInterval( 5000 );
     74        Timer::start();
    7175}
    7276
     
    7579        logging_info("deleting base overlay");
    7680
     81        Timer::stop();
    7782        bc.unregisterMessageReceiver( this );
    7883        bc.unregisterEventListener( this );
     
    147152        //
    148153        // create the overlay
    149         // and bootstrap against ourselfs
    150154        //
    151155
    152156        overlayInterface->createOverlay();
     157        BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
     158                i->onOverlayCreate( spovnetId );
     159        }
     160
     161        //
     162        // bootstrap against ourselfs
     163        //
     164
    153165        overlayInterface->joinOverlay();
     166        BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
     167                i->onJoinSuccess( spovnetId );
     168        }
    154169
    155170        ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
    156171        ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
    157 
    158         // inform all registered services of the event
    159         BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
    160                 i->onOverlayCreate( spovnetId );
    161         }
    162172}
    163173
     
    245255        overmsg.encapsulate( const_cast<Message*>(message) );
    246256
     257        i->second.markused();
    247258        return bc.sendMessage( link, &overmsg );
    248259}
     
    263274
    264275        if( link == LinkID::UNSPECIFIED ){
    265                 logging_error( "no link could be found to send message to node " <<
    266                                 node.toString() << " for service " << service.toString() );
    267                 return -1;
    268         }
    269 
     276
     277                logging_info( "no link could be found to send message to node " <<
     278                                node.toString() << " for service " << service.toString() <<
     279                                ". creating auto link ...");
     280               
     281                const LinkID link = establishLink( node, service );
     282                LinkMapping::iterator i = linkMapping.find( link );
     283
     284                if( i == linkMapping.end() ){
     285                        logging_error( "failed to establish auto link to node " << node.toString() <<
     286                                        " for service " << service.toString() );
     287                        return -1;
     288                }
     289
     290                i->second.autolink = true;
     291
     292        } // if( link != LinkID::UNSPECIFIED )
     293
     294        i->second.markused();
    270295        return sendMessage( message, link );
    271296}
     
    352377                                " on link " << id.toString() );
    353378
    354                 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId );
     379                OverlayMsg overMsg(
     380                        OverlayMsg::OverlayMessageTypeUpdate,
     381                        i->second.service,
     382                        nodeId
     383                        );
     384
    355385                bc.sendMessage( id, &overMsg );
     386                i->second.markused();
    356387
    357388        } // if( i == linkMapping.end() )
     
    397428        if( i->second.interface != NULL )
    398429                i->second.interface->onLinkChanged( id, nodeId, i->second.node );
     430
     431        i->second.markused();
    399432}
    400433
     
    412445        if( i->second.interface != NULL )
    413446                i->second.interface->onLinkFail( id, nodeId, i->second.node );
     447
     448        i->second.markused();
    414449}
    415450
     
    427462        if( i->second.interface != NULL )
    428463                i->second.interface->onLinkQoSChanged( id, nodeId, i->second.node, qos );
     464
     465        i->second.markused();
    429466}
    430467
     
    434471        OverlayMsg* overlayMsg = ((Message*)message)->decapsulate<OverlayMsg>();
    435472        if( overlayMsg == NULL ) return false;
     473
     474        // mark the link as in action
     475        LinkMapping::iterator item = linkMapping.find( link );
     476        if( item != linkMapping.end() ) item->second.markused();
    436477
    437478        //
     
    683724                i->second.interface = iface;
    684725                iface->onLinkUp( link, nodeId, sourcenode );
     726                i->second.markused();
    685727
    686728                return true ;
     
    828870}
    829871
     872void BaseOverlay::eventFunction(){
     873
     874        list<LinkID> oldlinks;
     875        time_t now = time(NULL);
     876
     877        // first gather all the links from linkMapping that need droppin
     878        // don't directly drop, as the dropLink function affects the
     879        // linkMapping structure that we are traversing here.
     880        // drop links after a timeout of 30s
     881
     882        // the macro gets confused if type is passed directly
     883        // because of the comma in the pair definition
     884        typedef pair<LinkID,LinkItem> pairitem;
     885
     886        BOOST_FOREACH( pairitem item, linkMapping ){
     887                if( item.second.autolink && difftime(now, item.second.lastuse) > 30)
     888                        oldlinks.push_back( item.first );
     889        }
     890
     891        BOOST_FOREACH( const LinkID lnk, oldlinks ){
     892                dropLink( lnk );
     893        }
     894}
     895
    830896}} // namespace ariba, overlay
  • source/ariba/overlay/BaseOverlay.h

    r2473 r2483  
    4343#include <iostream>
    4444#include <algorithm>
     45#include <ctime>
    4546#include <boost/foreach.hpp>
    4647
     
    5051#include "ariba/utility/misc/Demultiplexer.hpp"
    5152#include "ariba/utility/logging/Logging.h"
     53#include "ariba/utility/system/Timer.h"
    5254
    5355#include "ariba/communication/EndpointDescriptor.h"
     
    6769using std::map;
    6870using std::make_pair;
     71using std::pair;
    6972
    7073using ariba::communication::EndpointDescriptor;
     
    9295using ariba::utility::MessageSender;
    9396using ariba::utility::seqnum_t;
     97using ariba::utility::Timer;
    9498
    9599#define ovl OvlVis::instance()
     
    109113        public MessageReceiver,
    110114        public CommunicationEvents,
    111         public OverlayStructureEvents {
     115        public OverlayStructureEvents,
     116        protected Timer {
    112117
    113118        use_logging_h( BaseOverlay );
     
    248253        virtual void onNodeJoin( const NodeID& node );
    249254
     255        // for timer events
     256        virtual void eventFunction();
     257
    250258private:
    251259
     
    316324                static const LinkItem UNSPECIFIED;
    317325
    318                 LinkItem( const LinkID& _link, const NodeID& _node,
     326                LinkItem( const LinkID& _link, const NodeID& _node, 
    319327                                const ServiceID& _service, ServiceInterface* _interface )
    320                         : link( _link ), node( _node ), service( _service ), interface( _interface ){
     328                        : link( _link ), node( _node ), service( _service ), interface( _interface ),
     329                                autolink( false ), lastuse( time(NULL) ) {
    321330                }
     331
     332                // general information about the link
    322333
    323334                const LinkID link;
     
    325336                ServiceID service;
    326337                ServiceInterface* interface;
     338
     339                // information needed for auto links
     340
     341                void markused(){
     342                        lastuse = time(NULL);
     343                }
     344
     345                bool autolink;
     346                time_t lastuse;
    327347        };
    328348
Note: See TracChangeset for help on using the changeset viewer.