Ignore:
Timestamp:
Aug 11, 2009, 4:11:02 PM (15 years ago)
Author:
Christoph Mayer
Message:

merge noch nicht fertig

Location:
source/ariba/overlay/modules/chord
Files:
2 deleted
4 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/overlay/modules/chord/Chord.cpp

    r5803 r5870  
    3838
    3939#include "ariba/overlay/BaseOverlay.h"
     40#include "ariba/overlay/messages/OverlayMsg.h"
    4041
    4142#include "Chord.h"
    42 #include "messages/ChordMessage.h"
     43#include "detail/chord_routing_table.hpp"
     44
    4345#include "messages/Discovery.h"
    44 
    45 #include "detail/chord_routing_table.hpp"
    4646
    4747namespace ariba {
    4848namespace overlay {
     49
     50enum signalMessageTypes {
     51        typeDiscovery = OverlayMsg::typeSignalingStart + 0x01,
     52        typeLeave = OverlayMsg::typeSignalingStart + 0x02,
     53};
    4954
    5055typedef chord_routing_table::item route_item;
     
    5762
    5863        // create routing table
     64<<<<<<< .working
    5965        this->table = new chord_routing_table(_nodeid, 2);
     66=======
     67        this->table = new chord_routing_table(_nodeid, 4);
     68>>>>>>> .merge-rechts.r5869
    6069        orphan_removal_counter = 0;
    6170        discovery_count = 0;
     
    7180
    7281/// helper: sets up a link using the base overlay
    73 LinkID Chord::setup(const EndpointDescriptor& endp, const NodeID& node, const NodeID& remoteRelay ) {
    74         logging_debug("Request to setup link to " << endp.toString() );
     82LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) {
    7583
    7684        // check if we already have a connection
    7785        for (int i=0; i<table->size(); i++)
    78                 if ((*table)[i]->id == node && !((*table)[i]->info.isUnspecified()))
     86                if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()))
    7987                        return LinkID::UNSPECIFIED;
    8088
    8189        // check if we are already trying to establish a link
    8290        for (size_t i=0; i<pending.size(); i++)
    83                 if ( pending[i] == node ) {
    84                         logging_debug("Already trying to establish a link to node " << node.toString() );
     91                if ( pending[i] == remote ) {
     92                        logging_debug("Already trying to establish a link to node "
     93                                << remote.toString() );
    8594                        return LinkID::UNSPECIFIED;
    8695                }
    8796
    8897        // adding node to list of pending connections
    89         pending.push_back( node );
     98        pending.push_back( remote );
     99
     100        logging_info("Request to setup link to " << endpoint.toString() );
    90101
    91102        // establish link via base overlay
     103<<<<<<< .working
    92104        return baseoverlay.establishLink(endp, node, OverlayInterface::OVERLAY_SERVICE_ID, remoteRelay );
     105=======
     106        return baseoverlay.establishLink( endpoint, remote,
     107                        OverlayInterface::OVERLAY_SERVICE_ID );
     108>>>>>>> .merge-rechts.r5869
    93109}
    94110
    95111/// helper: sends a message using the "base overlay"
    96 seqnum_t Chord::send(Message* msg, const LinkID& link) {
     112seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) {
    97113        if (link.isUnspecified()) return 0;
    98         return baseoverlay.sendMessage(msg, link);
     114        msg->setRelayed(true);
     115        return baseoverlay.send_link( msg, link );
    99116}
    100117
    101118/// sends a discovery message
    102 void Chord::send_discovery_to(const NodeID& destination, int ttl) {
     119void Chord::send_discovery_to(const NodeID& remote, int ttl) {
     120        LinkID link = getNextLinkId(remote);
     121        if ( remote == nodeid || link.isUnspecified()) return;
    103122        if ( table->size() == 0 ) return;
    104123
    105         ChordMessage cmsg(ChordMessage::discovery, nodeid, destination);
    106         Discovery dmsg;
    107         dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor());
    108         dmsg.setSourceRelay(baseoverlay.getRelayNode(destination));
    109         dmsg.setFollowType(Discovery::normal);
    110         dmsg.setTTL((uint8_t) ttl);
    111         cmsg.encapsulate(&dmsg);
    112 
     124        OverlayMsg msg( typeDiscovery );
     125        msg.setRelayed(true);
     126        Discovery dmsg( Discovery::normal, (uint8_t)2, baseoverlay.getEndpointDescriptor() );
     127        msg.encapsulate(&dmsg);
     128
     129<<<<<<< .working
    113130        // get next hop
    114131        const route_item* item = table->get_next_hop(destination);
    115132        if (item!=NULL && !item->info.isUnspecified()) send(&cmsg, item->info);
    116 }
    117 
    118 void Chord::discover_neighbors( const LinkID& lnk ) {
     133=======
     134        // send to node
     135        baseoverlay.send_node( &msg, remote );
     136>>>>>>> .merge-rechts.r5869
     137}
     138
     139void Chord::discover_neighbors( const LinkID& link ) {
     140        uint8_t ttl = 2;
     141        {
     142                // send predecessor discovery
     143                OverlayMsg msg( typeDiscovery );
     144                msg.setRelayed(true);
     145                Discovery dmsg( Discovery::predecessor, ttl,
     146                        baseoverlay.getEndpointDescriptor() );
     147                msg.encapsulate(&dmsg);
     148                send(&msg, link);
     149        }
    119150        {
    120151                // send successor discovery
    121                 ChordMessage cmsg(ChordMessage::discovery, nodeid, nodeid);
    122                 Discovery dmsg;
    123                 dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor());
    124                 dmsg.setSourceRelay(baseoverlay.getRelayNode(nodeid));
    125                 dmsg.setFollowType(Discovery::successor);
    126                 dmsg.setTTL((uint8_t)4);
    127                 cmsg.encapsulate(&dmsg);
    128                 send(&cmsg, lnk);
    129         }
    130         {
    131                 // send predecessor discovery
    132                 ChordMessage cmsg(ChordMessage::discovery, nodeid, nodeid);
    133                 Discovery dmsg;
    134                 dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor());
    135                 dmsg.setSourceRelay(baseoverlay.getRelayNode(nodeid));
    136                 dmsg.setFollowType(Discovery::predecessor);
    137                 dmsg.setTTL((uint8_t)4);
    138                 cmsg.encapsulate(&dmsg);
    139                 send(&cmsg, lnk);
     152                OverlayMsg msg( typeDiscovery );
     153                msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() );
     154                msg.setRelayed(true);
     155                Discovery dmsg( Discovery::successor, ttl,
     156                        baseoverlay.getEndpointDescriptor() );
     157                msg.encapsulate(&dmsg);
     158                send(&msg, link);
    140159        }
    141160}
     
    166185        for (size_t i = 0; i < table->size(); i++) {
    167186                route_item* it = (*table)[i];
    168                 ChordMessage msg(ChordMessage::leave, nodeid, it->id);
    169                 send(&msg,it->info);
    170         }
    171 }
    172 
     187                OverlayMsg msg( typeLeave );
     188                send( &msg, it->info );
     189        }
     190}
     191
     192/// @see OverlayInterface.h
    173193const EndpointDescriptor& Chord::resolveNode(const NodeID& node) {
    174194        const route_item* item = table->get(node);
     
    177197}
    178198
    179 void Chord::routeMessage(const NodeID& destnode, Message* msg) {
     199/// @see OverlayInterface.h
     200const LinkID& Chord::getNextLinkId( const NodeID& id ) const {
    180201        // get next hop
     202<<<<<<< .working
    181203        const route_item* item = table->get_next_hop(destnode);
    182 
     204=======
     205        const route_item* item = table->get_next_hop(id);
     206>>>>>>> .merge-rechts.r5869
     207
     208<<<<<<< .working
    183209        // message for this node? yes-> delegate to base overlay
    184210        if (item->id == nodeid || destnode == nodeid)
    185211                baseoverlay.incomingRouteMessage( msg, LinkID::UNSPECIFIED, nodeid );
    186 
     212=======
     213        // returns a unspecified id when this is itself
     214        if (item == NULL || item->id == nodeid)
     215                return LinkID::UNSPECIFIED;
     216>>>>>>> .merge-rechts.r5869
     217
     218<<<<<<< .working
    187219        else { // no-> send to next hop
    188220                ChordMessage cmsg(ChordMessage::route, nodeid, destnode);
     
    190222                send(&cmsg, item->info);
    191223        }
    192 }
    193 
     224=======
     225        /// return routing info
     226        return item->info;
     227>>>>>>> .merge-rechts.r5869
     228}
     229
     230<<<<<<< .working
    194231/// @see OverlayInterface.h
    195232void Chord::routeMessage(const NodeID& node, const LinkID& link, Message* msg) {
     
    214251}
    215252
     253=======
     254>>>>>>> .merge-rechts.r5869
    216255OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const {
    217256        OverlayInterface::NodeList nodelist;
     
    243282/// @see OverlayInterface.h
    244283void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) {
    245         logging_debug("link_up: link=" << lnk.toString() << " remote=" <<
     284        logging_info("link_up: link=" << lnk.toString() << " remote=" <<
    246285                        remote.toString() );
    247286        for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++)
     
    250289                        break;
    251290                }
     291/*
     292        // check if we already have a connection, yes-> do not handle duplicate!
     293        for (int i=0; i<table->size(); i++)
     294                if ((*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()) && (*table)[i]->info != lnk) {
     295
     296                        return;
     297                }
     298*/
     299        if (remote==nodeid) {
     300                baseoverlay.dropLink(lnk);
     301                return;
     302        }
     303
    252304        route_item* item = table->insert(remote);
    253305
     
    257309                                << " with link " << lnk.toString());
    258310                // replace with new link
    259                 if (!item->info.isUnspecified())
     311                if (!item->info.isUnspecified() || item->info!=lnk)
    260312                        baseoverlay.dropLink(item->info);
    261313                item->info = lnk;
     314                // discover neighbors of new overlay neighbor
     315                discover_neighbors( lnk );
     316                showLinks();
    262317        } else { // no-> add orphan entry to routing table
    263318                logging_info("new orphan: " << remote.toString()
     
    266321        }
    267322
    268         discover_neighbors( lnk );
    269 
     323        // erase bootstrap link
    270324        vector<LinkID>::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk);
    271         if( it != bootstrapLinks.end() ) {
    272                 discover_neighbors( lnk );
    273                 bootstrapLinks.erase( it );
    274         }
     325        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
    275326}
    276327
     
    282333        // remove link from routing table
    283334        route_item* item = table->get(remote);
    284         if (item!=NULL) item->info = LinkID::UNSPECIFIED;
    285         table->remove(remote);
     335        if (item!=NULL && item->info==lnk) {
     336                item->info = LinkID::UNSPECIFIED;
     337                table->remove(remote);
     338        }
    286339}
    287340
     
    292345
    293346        // decode message
    294         typedef ChordMessage M;
    295         M* m = msg.getMessage()->convert<ChordMessage> ();
     347        OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage());
    296348        if (m == NULL) return;
    297349
     
    299351        switch (m->getType()) {
    300352
     353<<<<<<< .working
    301354        // invalid message
    302355        case M::invalid:
     
    324377                // discovery request
    325378        case M::discovery: {
     379=======
     380        // discovery request
     381        case typeDiscovery: {
     382>>>>>>> .merge-rechts.r5869
    326383                // decapsulate message
    327384                Discovery* dmsg = m->decapsulate<Discovery> ();
    328                 logging_debug("received discovery message with"
    329                             << " src=" << m->getSource().toString()
    330                                 << " dst=" << m->getDestination().toString()
     385                logging_debug("Received discovery message with"
     386                            << " src=" << m->getSourceNode().toString()
     387                                << " dst=" << m->getDestinationNode().toString()
    331388                                << " ttl=" << (int)dmsg->getTTL()
    332                                 << " type=" << (int)dmsg->getFollowType()
     389                                << " type=" << (int)dmsg->getType()
    333390                );
    334391
    335392                // check if source node can be added to routing table and setup link
    336                 if (m->getSource() != nodeid && table->is_insertable(m->getSource()))
    337                         setup(*dmsg->getSourceEndpoint(), m->getSource(), dmsg->getSourceRelay() );
     393                if (m->getSourceNode() != nodeid
     394                        && table->is_insertable(m->getSourceNode()))
     395                        setup( dmsg->getEndpoint(), m->getSourceNode() );
    338396
    339397                // delegate discovery message
    340                 switch (dmsg->getFollowType()) {
     398                switch (dmsg->getType()) {
    341399
    342400                // normal: route discovery message like every other message
    343401                case Discovery::normal: {
    344402                        // closest node? yes-> split to follow successor and predecessor
    345                         if (table->is_closest_to(m->getDestination())
    346                                 || (table->get_successor()!=NULL && *table->get_successor() == m->getDestination())
    347                                 || (table->get_predesessor()!=NULL && *table->get_predesessor() == m->getDestination())
    348                         ) {
     403                        if ( table->is_closest_to(m->getDestinationNode()) ) {
    349404
    350405                                if (table->get_successor() != NULL) {
    351                                         // send successor message
    352                                         ChordMessage cmsg_s(*m);
    353                                         Discovery dmsg_s(*dmsg);
    354                                         dmsg_s.setFollowType(Discovery::successor);
    355                                         cmsg_s.encapsulate(&dmsg_s);
     406                                        OverlayMsg omsg(*m);
     407                                        dmsg->setType(Discovery::successor);
     408                                        omsg.encapsulate(dmsg);
    356409                                        route_item* succ_item = table->get(*table->get_successor());
    357410                                        logging_debug("Discovery split: routing discovery message to successor "
    358411                                                        << succ_item->id.toString() );
     412<<<<<<< .working
    359413                                        send(&cmsg_s, succ_item->info);
     414=======
     415                                        send(&omsg, succ_item->info);
     416>>>>>>> .merge-rechts.r5869
    360417                                }
    361418
    362419                                // send predecessor message
    363420                                if (table->get_predesessor() != NULL) {
    364                                         ChordMessage cmsg_p(*m);
    365                                         Discovery dmsg_p(*dmsg);
    366                                         dmsg_p.setFollowType(Discovery::predecessor);
    367                                         cmsg_p.encapsulate(&dmsg_p);
     421                                        OverlayMsg omsg(*m);
     422                                        dmsg->setType(Discovery::predecessor);
     423                                        omsg.encapsulate(dmsg);
    368424                                        route_item* pred_item = table->get(
    369425                                                        *table->get_predesessor());
    370426                                        logging_debug("Discovery split: routing discovery message to predecessor "
    371427                                                        << pred_item->id.toString() );
     428<<<<<<< .working
    372429                                        send(&cmsg_p, pred_item->info);
     430=======
     431                                        send( &omsg, pred_item->info);
     432>>>>>>> .merge-rechts.r5869
    373433                                }
    374434                        }
    375435                        // no-> route message
    376436                        else {
     437<<<<<<< .working
    377438                                // find next hop
    378439                                const route_item* item = table->get_next_hop(m->getDestination());
     
    381442                                                item->id.toString() );
    382443                                send(m, item->info);
     444=======
     445                                baseoverlay.send( m, m->getDestinationNode() );
     446>>>>>>> .merge-rechts.r5869
    383447                        }
    384448                        break;
     
    395459
    396460                        const route_item* item = NULL;
    397                         if (dmsg->getFollowType() == Discovery::successor &&
     461                        if (dmsg->getType() == Discovery::successor &&
    398462                                        table->get_successor() != NULL) {
    399463                                item = table->get(*table->get_successor());
     
    405469                        logging_debug("routing discovery message to succ/pred "
    406470                                << item->id.toString() );
    407                         ChordMessage cmsg(*m);
    408                         Discovery dmsg_p(*dmsg);
    409                         cmsg.encapsulate(&dmsg_p);
    410                         send(&cmsg, item->info);
     471                        OverlayMsg omsg(*m);
     472                        omsg.encapsulate(dmsg);
     473                        omsg.setDestinationNode(item->id);
     474                        baseoverlay.send(&omsg, omsg.getDestinationNode());
    411475                        break;
    412476                }}
     
    415479        }
    416480
    417                 // leave
    418         case M::leave: {
     481        // leave
     482        case typeLeave: {
    419483                if (link!=LinkID::UNSPECIFIED) {
    420484                        route_item* item = table->get(remote);
     
    424488                }
    425489                break;
    426         }
    427         }
    428         delete m;
     490        }}
    429491}
    430492
     
    464526                // remove orphan links
    465527                orphan_removal_counter++;
     528<<<<<<< .working
    466529                if (orphan_removal_counter <0 || orphan_removal_counter >= 4) {
     530=======
     531                if (orphan_removal_counter <0 || orphan_removal_counter >= 2) {
     532>>>>>>> .merge-rechts.r5869
    467533                        logging_info("Running orphan removal");
    468534                        orphan_removal_counter = 0;
     
    473539                                        table->insert(it->id);
    474540                                        if (it->ref_count==0) {
    475                                                 baseoverlay.dropLink(it->info);
     541                                                LinkID id = it->info;
    476542                                                it->info = LinkID::UNSPECIFIED;
     543                                                baseoverlay.dropLink(id);
    477544                                        }
    478545                                }
     
    480547                }
    481548        }
     549}
     550
     551void Chord::showLinks() {
    482552        logging_info("--- chord routing information ----------------------------------");
    483553        logging_info("predecessor: " << (table->get_predesessor()==NULL? "<none>" :
  • source/ariba/overlay/modules/chord/Chord.h

    r5803 r5870  
    5151namespace overlay {
    5252
     53class OverlayMsg;
     54
    5355using ariba::communication::EndpointDescriptor;
    5456using ariba::utility::Timer;
     
    8284        // helper: sets up a link using the "base overlay"
    8385        LinkID setup( const EndpointDescriptor& endp,
    84                 const NodeID& node = NodeID::UNSPECIFIED,
    85                 const NodeID& remoteRelay = NodeID::UNSPECIFIED );
     86                const NodeID& node = NodeID::UNSPECIFIED );
    8687
    8788        // helper: sends a message using the "base overlay"
    88         seqnum_t send( Message* msg, const LinkID& link );
     89        seqnum_t send( OverlayMsg* msg, const LinkID& link );
    8990
    9091        // stabilization: sends a discovery message to the specified neighborhood
     
    9293
    9394        void discover_neighbors( const LinkID& lnk );
     95
     96        void showLinks();
    9497
    9598public:
     
    119122
    120123        /// @see OverlayInterface.h
    121         virtual void routeMessage( const NodeID& destnode, Message* msg );
    122 
    123         /// @see OverlayInterface.h
    124         virtual void routeMessage(const NodeID& node, const LinkID& link, Message* msg);
    125 
    126         /// @see OverlayInterface.h
    127124        virtual NodeList getKnownNodes(bool deep = true) const;
    128125
     
    137134                        const LinkID& lnk = LinkID::UNSPECIFIED);
    138135
    139 
    140 
    141136        /// @see Timer.h
    142137        virtual void eventFunction();
     138
    143139};
    144140
  • source/ariba/overlay/modules/chord/messages/Discovery.cpp

    r5681 r5870  
    4444vsznDefault(Discovery);
    4545
    46 Discovery::Discovery(){
    47 }
    48 
    4946Discovery::~Discovery(){
    5047}
  • source/ariba/overlay/modules/chord/messages/Discovery.h

    r5744 r5870  
    4141
    4242#include <vector>
     43
    4344#include "ariba/utility/messages.h"
    4445#include "ariba/utility/serialization.h"
     
    6162        VSERIALIZEABLE;
    6263public:
    63         enum follow_type_ {
    64                 normal = 0,
    65                 successor = 1,
    66                 predecessor = 2
     64        enum type_ {
     65                invalid = 0,
     66                normal = 1,
     67                successor = 2,
     68                predecessor = 3
    6769        };
    6870
    69         Discovery( const Discovery& msg ) {
    70                 this->follow_type = msg.follow_type;
    71                 this->ttl = msg.ttl;
    72                 this->source_relay = msg.source_relay;
    73                 this->source_endpoint = msg.source_endpoint;
     71        Discovery( const Discovery& msg ) : type(msg.type), ttl(msg.ttl),
     72                endpoint(msg.endpoint) {
    7473        }
    75         explicit Discovery();
     74        Discovery( type_ type = invalid, uint8_t ttl = 4,
     75                const EndpointDescriptor& endpoint = EndpointDescriptor::UNSPECIFIED() )
     76        : type(type),  ttl(ttl), endpoint(endpoint) {
     77        }
    7678        virtual ~Discovery();
    7779
    78         const EndpointDescriptor* getSourceEndpoint() const {
    79                 return &source_endpoint;
     80        inline type_ getType() const {
     81                return (type_)type;
    8082        }
    8183
    82         void setSourceEndpoint( const EndpointDescriptor* endpoint ) {
    83                 source_endpoint = *endpoint;
     84        inline void setType( type_ type ) {
     85                this->type = type;
    8486        }
    8587
     
    9294        }
    9395
    94         inline follow_type_ getFollowType() const {
    95                 return (follow_type_)follow_type;
     96        inline const EndpointDescriptor& getEndpoint() const {
     97                return endpoint;
    9698        }
    9799
    98         inline void setFollowType( follow_type_ type ) {
    99                 follow_type = (uint8_t)type;
     100        inline void setEndpoint( const EndpointDescriptor& endpoint ) {
     101                this->endpoint = endpoint;
    100102        }
    101103
    102         inline void setSourceRelay( const NodeID& relay ) {
    103                 source_relay = relay;
    104         }
    105 
    106         inline const NodeID& getSourceRelay() const {
    107                 return source_relay;
    108         }
    109104private:
    110         uint8_t follow_type;
     105        uint8_t type;
    111106        uint8_t ttl;
    112         EndpointDescriptor source_endpoint;
    113         NodeID source_relay;
     107        EndpointDescriptor endpoint;
    114108};
    115109
     
    117111
    118112sznBeginDefault( ariba::overlay::Discovery, X ) {
    119         /// serialize follow-type and time-to-live
    120         X && follow_type && ttl;
    121 
    122         // serialize end-point
    123         X && &source_relay && source_endpoint;
     113        X && type && ttl && endpoint;
    124114} sznEnd();
    125115
Note: See TracChangeset for help on using the changeset viewer.