Changeset 3712 for source


Ignore:
Timestamp:
May 26, 2009, 8:03:39 PM (15 years ago)
Author:
Christoph Mayer
Message:

-autolink message buffer von basecomm nach baseoverlay und komplette logik hochgezogen damit mit neuem overlay routing zurecht kommt, damit ist letzer fehler raus

Location:
source/ariba
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/communication/BaseCommunication.cpp

    r3699 r3712  
    217217        }
    218218
    219         // warn if this link has some queued messages attached
    220         if( descriptor.waitingmsg.size() > 0 ){
    221                 logging_warn( "dropping link " << link.toString() <<
    222                         " that has " << descriptor.waitingmsg.size() << " waiting messages" );
    223         }
    224 
    225219        // create message to drop the link
    226220        logging_debug( "sending out link close request. for us, the link is closed now" );
     
    266260        msg.encapsulate( const_cast<Message*>(message) );
    267261
    268         if( linkDesc.linkup ){
    269 
    270                 // send message
    271                 transport->sendMessage( &msg );
    272                 return ++currentSeqnum;
    273 
    274         } else {
    275 
    276                 // queue message
    277                 logging_info( "link " << lid.toString() << " is not up yet, queueing message" );
    278                 linkDesc.waitingmsg.push_back( new Message(msg) ); // TODO ooooo
    279 
    280                 return 0;
    281 
    282         } // if( linkDesc.linkup )
     262        if( !linkDesc.linkup ){
     263                logging_error("cant send message on link " << lid.toString() << ", link not up");
     264                return -1;
     265        }
     266
     267        // send message
     268        transport->sendMessage( &msg );
     269        return ++currentSeqnum;
    283270}
    284271
     
    466453                }
    467454
    468                 if( linkDesc.waitingmsg.size() > 0 ){
    469                         logging_info( "sending out queued messages on link " << linkDesc.localLink.toString() );
    470 
    471                         BOOST_FOREACH( Message* msg, linkDesc.waitingmsg ){
    472                                 sendMessage( linkDesc.localLink, msg );
    473                                 delete msg;
    474                         }
    475 
    476                         linkDesc.waitingmsg.clear();
    477                 }
    478 
    479455        } // LINK_STATE_OPEN_REPLY
    480456
     
    503479                        i->onLinkDown( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator );
    504480                }
    505 
    506                 //
    507                 // delete all pending messages for the link that has been closed
    508                 //
    509 
    510                 BOOST_FOREACH( Message* msg, linkDesc.waitingmsg ){
    511                         delete msg;
    512                 }
    513 
    514                 linkDesc.waitingmsg.clear();
    515481
    516482                //
     
    579545                if( (*i).localLink != localLink) continue;
    580546
    581                 BOOST_FOREACH( Message* msg, i->waitingmsg ){
    582                         delete msg;
    583                 }
    584 
    585                 i->waitingmsg.clear();
    586547                linkSet.erase( i );
    587 
    588548                break;
    589549        }
  • source/ariba/communication/BaseCommunication.h

    r3690 r3712  
    4646#include <vector>
    4747#include <iostream>
    48 #include <deque>
    4948#include <algorithm>
    5049#include <boost/foreach.hpp>
     
    7877using std::map;
    7978using std::vector;
    80 using std::deque;
    8179using std::pair;
    8280using std::make_pair;
     
    256254                        remoteEndpoint(desc.remoteEndpoint),
    257255                        linkup(desc.linkup) {
    258 
    259                         BOOST_FOREACH( Message* msg, desc.waitingmsg ){
    260                                 waitingmsg.push_back( msg );
    261                         }
    262256                }
    263257
     
    266260                }
    267261
    268                 LinkID                  localLink;
     262                LinkID                                  localLink;
    269263                const NetworkLocator*   localLocator;
    270                 LinkID                  remoteLink;
     264                LinkID                                  remoteLink;
    271265                const NetworkLocator*   remoteLocator;
    272                 EndpointDescriptor      remoteEndpoint;
    273 
    274                 bool                    linkup;
    275                 deque<Message*>         waitingmsg;
     266                EndpointDescriptor              remoteEndpoint;
     267
     268                bool                                    linkup;
    276269        };
    277270
  • source/ariba/overlay/BaseOverlay.cpp

    r3705 r3712  
    270270        LinkMapping::iterator i = linkMapping.find( link );
    271271
     272        // find the link item to drop
    272273        if( i == linkMapping.end() ){
    273274                logging_warn( "can't drop link, mapping unknown " << link.toString() );
     
    275276        }
    276277
     278        LinkItem item = i->second;
     279
     280        // delete all queued messages
     281        if( item.waitingmsg.size() > 0 ){
     282
     283                logging_warn( "dropping link " << link.toString() <<
     284                        " that has " << item.waitingmsg.size() << " waiting messages" );
     285
     286                item.deleteWaiting();
     287        }
     288
     289        // erase the mapping and drop the link
    277290        linkMapping.erase( i );
    278 
    279         LinkItem item = i->second;
    280291        bc->dropLink( link );
    281292
     293        // tell sideports and listeners of the drop
    282294        item.interface->onLinkDown( link, item.node );
    283295        sideport->onLinkDown(link, this->nodeId, item.node, this->spovnetId );
     
    287299
    288300        logging_debug( "baseoverlay is sending data message on link " << link.toString() );
     301
     302        //
     303        // get the mapping for this link
     304        //
    289305
    290306        LinkMapping::iterator i = linkMapping.find( link );
     
    294310        }
    295311
    296         OverlayMsg overmsg(
    297                 OverlayMsg::OverlayMessageTypeData,     i->second.service, nodeId );
     312        i->second.markused();
     313
     314        //
     315        // check if the link is up yet, if its an autlink queue message
     316        //
     317
     318        if( !i->second.linkup ){
     319
     320                if( i->second.autolink ){
     321                        logging_info( "auto link " << link.toString() << " is not up yet, queueing message" );
     322                        i->second.waitingmsg.push_back( new Message(*message) );
     323                } else {
     324                        logging_error("link " << link.toString() << " is not up yet, dropping message" );
     325                }
     326
     327                return -1;
     328        }
     329
     330        //
     331        // send the message through the basecomm
     332        //
     333
     334        OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId );
    298335        overmsg.encapsulate( const_cast<Message*>(message) );
    299336
    300         i->second.markused();
    301337        return bc->sendMessage( link, &overmsg );
    302338}
     
    308344        LinkMapping::iterator i = linkMapping.begin();
    309345        LinkMapping::iterator iend = linkMapping.end();
     346
     347        //
     348        // see if we find a link for this node and service destination
     349        //
    310350
    311351        for( ; i != iend; i++ ){
     
    316356        }
    317357
     358        //
     359        // if we found no link, create an auto link
     360        //
     361
    318362        if( link == LinkID::UNSPECIFIED ){
    319363
     
    322366                                ". creating auto link ...");
    323367
     368                // call basecomm to create a link
    324369                link = establishLink( node, service );
     370
     371                // this will call onlinkup on us, if everything worked we now have a mapping
    325372                LinkMapping::iterator i = linkMapping.find( link );
     373                i->second.autolink = true;
    326374
    327375                if( i == linkMapping.end() || link == LinkID::UNSPECIFIED ){
     
    330378                        return -1;
    331379                }
    332 
    333                 i->second.autolink = true;
    334380
    335381                logging_debug( "establishing autolink in progress to node "
     
    531577        sideport->onLinkDown( id, this->nodeId, i->second.node, this->spovnetId );
    532578
     579        // delete all queued messages
     580        if( i->second.waitingmsg.size() > 0 ){
     581
     582                logging_warn( "dropping link " << id.toString() <<
     583                        " that has " << i->second.waitingmsg.size() << " waiting messages" );
     584
     585                i->second.deleteWaiting();
     586        }
     587
    533588        linkMapping.erase( i );
    534589}
     
    788843
    789844                // update our link mapping information for this link
    790                 bool changed =
    791                         ( i->second.node != sourcenode ) ||
    792                         ( i->second.service != service );
     845                bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service );
    793846                i->second.node = sourcenode;
    794847                i->second.service = service;
     
    818871
    819872                // ask the service whether it wants to accept this link
    820                 if( iface->onLinkRequest(sourcenode) ){
    821 
    822                         logging_debug("link " << link.toString() <<
    823                                         " has been accepted by service " << service.toString());
    824 
    825                         // call the notification functions
    826                         iface->onLinkUp( link, sourcenode );
    827                         sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId );
    828 
    829                 } else {
     873                if( !iface->onLinkRequest(sourcenode) ){
    830874
    831875                        logging_debug("link " << link.toString() <<
     
    836880                        // drop the link
    837881                        dropLink( link );
    838                 }
     882
     883                        return true;
     884                }
     885
     886                //
     887                // link has been accepted, link is now up, send messages out first
     888                //
     889
     890                i->second.linkup = true;
     891                logging_debug("link " << link.toString() <<
     892                                                " has been accepted by service " << service.toString() << " and is now up");
     893
     894                if( i->second.waitingmsg.size() > 0 ){
     895                        logging_info( "sending out queued messages on link " << link.toString() );
     896
     897                        BOOST_FOREACH( Message* msg, i->second.waitingmsg ){
     898                                sendMessage( msg, link );
     899                                delete msg;
     900                        }
     901
     902                        i->second.waitingmsg.clear();
     903                }
     904
     905                // call the notification functions
     906                iface->onLinkUp( link, sourcenode );
     907                sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId );
    839908
    840909                return true;
  • source/ariba/overlay/BaseOverlay.h

    r3690 r3712  
    4646#include <list>
    4747#include <vector>
     48#include <deque>
    4849#include <boost/foreach.hpp>
    4950
     
    8081using std::pair;
    8182using std::find;
     83using std::deque;
    8284
    8385using ariba::NodeListener;
     
    420422
    421423                LinkItem() :
    422                         link(LinkID::UNSPECIFIED), node(NodeID::UNSPECIFIED), service(
    423                                         ServiceID::UNSPECIFIED), interface(
    424                                         &CommunicationListener::DEFAULT), autolink(false), lastuse(
    425                                         0) {
     424                        link(LinkID::UNSPECIFIED),
     425                        node(NodeID::UNSPECIFIED),
     426                        service(ServiceID::UNSPECIFIED),
     427                        interface(&CommunicationListener::DEFAULT),
     428                        autolink(false),
     429                        lastuse(0),
     430                        linkup(false){
    426431                }
    427432
    428433                LinkItem(const LinkID& _link, const NodeID& _node,
    429434                                const ServiceID& _service, CommunicationListener* _interface) :
    430                         link(_link), node(_node), service(_service), interface(_interface),
    431                                         autolink(false), lastuse(time(NULL)) {
     435                        link(_link),
     436                        node(_node),
     437                        service(_service),
     438                        interface(_interface),
     439                        autolink(false),
     440                        lastuse(time(NULL)),
     441                        linkup(false){
    432442
    433443                        assert( _interface != NULL );
    434444                }
    435445
     446                LinkItem(const LinkItem& rh) :
     447                        link(rh.link),
     448                        node(rh.node),
     449                        service(rh.service),
     450                        interface(rh.interface),
     451                        autolink(rh.autolink),
     452                        lastuse(rh.lastuse),
     453                        linkup(rh.linkup){
     454
     455                        BOOST_FOREACH( Message* msg, rh.waitingmsg ){
     456                                waitingmsg.push_back( msg );
     457                        }
     458                }
     459
     460                void deleteWaiting(){
     461                        BOOST_FOREACH( Message* msg, waitingmsg ){
     462                                delete msg;
     463                        }
     464                        waitingmsg.clear();
     465                }
     466
    436467                // general information about the link
    437 
    438468                const LinkID link;
    439469                NodeID node;
    440470                ServiceID service;
    441471                CommunicationListener* interface;
     472                bool linkup;
    442473
    443474                // information needed for auto links
     
    448479                bool autolink;
    449480                time_t lastuse;
     481                deque<Message*> waitingmsg;
    450482        };
    451483
     
    467499};
    468500
    469 }
    470 } // namespace ariba, overlay
     501}} // namespace ariba, overlay
    471502
    472503#endif /*BASEOVERLAY_H_*/
  • source/ariba/utility/system/StartupWrapper.cpp

    r3690 r3712  
    108108        // set up again an individual level if you like
    109109        {
    110                 // log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("BaseOverlay"));
    111                 // logger->setLevel(log4cxx::Level::getInfo());
     110//              log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("BaseOverlay"));
     111//              logger->setLevel(log4cxx::Level::getDebug());
     112        }
     113        {
     114//              log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("BaseCommunication"));
     115//              logger->setLevel(log4cxx::Level::getDebug());
    112116        }
    113117
Note: See TracChangeset for help on using the changeset viewer.