Ignore:
Timestamp:
Aug 11, 2009, 4:11:02 PM (15 years ago)
Author:
Christoph Mayer
Message:

merge noch nicht fertig

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/overlay/modules/chord/Chord.cpp

    r5803 r5870  
    3838
    3939#include "ariba/overlay/BaseOverlay.h"
     40#include "ariba/overlay/messages/OverlayMsg.h"
    4041
    4142#include "Chord.h"
    42 #include "messages/ChordMessage.h"
     43#include "detail/chord_routing_table.hpp"
     44
    4345#include "messages/Discovery.h"
    44 
    45 #include "detail/chord_routing_table.hpp"
    4646
    4747namespace ariba {
    4848namespace overlay {
     49
     50enum signalMessageTypes {
     51        typeDiscovery = OverlayMsg::typeSignalingStart + 0x01,
     52        typeLeave = OverlayMsg::typeSignalingStart + 0x02,
     53};
    4954
    5055typedef chord_routing_table::item route_item;
     
    5762
    5863        // create routing table
     64<<<<<<< .working
    5965        this->table = new chord_routing_table(_nodeid, 2);
     66=======
     67        this->table = new chord_routing_table(_nodeid, 4);
     68>>>>>>> .merge-rechts.r5869
    6069        orphan_removal_counter = 0;
    6170        discovery_count = 0;
     
    7180
    7281/// helper: sets up a link using the base overlay
    73 LinkID Chord::setup(const EndpointDescriptor& endp, const NodeID& node, const NodeID& remoteRelay ) {
    74         logging_debug("Request to setup link to " << endp.toString() );
     82LinkID Chord::setup(const EndpointDescriptor& endpoint, const NodeID& remote ) {
    7583
    7684        // check if we already have a connection
    7785        for (int i=0; i<table->size(); i++)
    78                 if ((*table)[i]->id == node && !((*table)[i]->info.isUnspecified()))
     86                if ((*table)[i]->ref_count > 0 && (*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()))
    7987                        return LinkID::UNSPECIFIED;
    8088
    8189        // check if we are already trying to establish a link
    8290        for (size_t i=0; i<pending.size(); i++)
    83                 if ( pending[i] == node ) {
    84                         logging_debug("Already trying to establish a link to node " << node.toString() );
     91                if ( pending[i] == remote ) {
     92                        logging_debug("Already trying to establish a link to node "
     93                                << remote.toString() );
    8594                        return LinkID::UNSPECIFIED;
    8695                }
    8796
    8897        // adding node to list of pending connections
    89         pending.push_back( node );
     98        pending.push_back( remote );
     99
     100        logging_info("Request to setup link to " << endpoint.toString() );
    90101
    91102        // establish link via base overlay
     103<<<<<<< .working
    92104        return baseoverlay.establishLink(endp, node, OverlayInterface::OVERLAY_SERVICE_ID, remoteRelay );
     105=======
     106        return baseoverlay.establishLink( endpoint, remote,
     107                        OverlayInterface::OVERLAY_SERVICE_ID );
     108>>>>>>> .merge-rechts.r5869
    93109}
    94110
    95111/// helper: sends a message using the "base overlay"
    96 seqnum_t Chord::send(Message* msg, const LinkID& link) {
     112seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) {
    97113        if (link.isUnspecified()) return 0;
    98         return baseoverlay.sendMessage(msg, link);
     114        msg->setRelayed(true);
     115        return baseoverlay.send_link( msg, link );
    99116}
    100117
    101118/// sends a discovery message
    102 void Chord::send_discovery_to(const NodeID& destination, int ttl) {
     119void Chord::send_discovery_to(const NodeID& remote, int ttl) {
     120        LinkID link = getNextLinkId(remote);
     121        if ( remote == nodeid || link.isUnspecified()) return;
    103122        if ( table->size() == 0 ) return;
    104123
    105         ChordMessage cmsg(ChordMessage::discovery, nodeid, destination);
    106         Discovery dmsg;
    107         dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor());
    108         dmsg.setSourceRelay(baseoverlay.getRelayNode(destination));
    109         dmsg.setFollowType(Discovery::normal);
    110         dmsg.setTTL((uint8_t) ttl);
    111         cmsg.encapsulate(&dmsg);
    112 
     124        OverlayMsg msg( typeDiscovery );
     125        msg.setRelayed(true);
     126        Discovery dmsg( Discovery::normal, (uint8_t)2, baseoverlay.getEndpointDescriptor() );
     127        msg.encapsulate(&dmsg);
     128
     129<<<<<<< .working
    113130        // get next hop
    114131        const route_item* item = table->get_next_hop(destination);
    115132        if (item!=NULL && !item->info.isUnspecified()) send(&cmsg, item->info);
    116 }
    117 
    118 void Chord::discover_neighbors( const LinkID& lnk ) {
     133=======
     134        // send to node
     135        baseoverlay.send_node( &msg, remote );
     136>>>>>>> .merge-rechts.r5869
     137}
     138
     139void Chord::discover_neighbors( const LinkID& link ) {
     140        uint8_t ttl = 2;
     141        {
     142                // send predecessor discovery
     143                OverlayMsg msg( typeDiscovery );
     144                msg.setRelayed(true);
     145                Discovery dmsg( Discovery::predecessor, ttl,
     146                        baseoverlay.getEndpointDescriptor() );
     147                msg.encapsulate(&dmsg);
     148                send(&msg, link);
     149        }
    119150        {
    120151                // send successor discovery
    121                 ChordMessage cmsg(ChordMessage::discovery, nodeid, nodeid);
    122                 Discovery dmsg;
    123                 dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor());
    124                 dmsg.setSourceRelay(baseoverlay.getRelayNode(nodeid));
    125                 dmsg.setFollowType(Discovery::successor);
    126                 dmsg.setTTL((uint8_t)4);
    127                 cmsg.encapsulate(&dmsg);
    128                 send(&cmsg, lnk);
    129         }
    130         {
    131                 // send predecessor discovery
    132                 ChordMessage cmsg(ChordMessage::discovery, nodeid, nodeid);
    133                 Discovery dmsg;
    134                 dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor());
    135                 dmsg.setSourceRelay(baseoverlay.getRelayNode(nodeid));
    136                 dmsg.setFollowType(Discovery::predecessor);
    137                 dmsg.setTTL((uint8_t)4);
    138                 cmsg.encapsulate(&dmsg);
    139                 send(&cmsg, lnk);
     152                OverlayMsg msg( typeDiscovery );
     153                msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() );
     154                msg.setRelayed(true);
     155                Discovery dmsg( Discovery::successor, ttl,
     156                        baseoverlay.getEndpointDescriptor() );
     157                msg.encapsulate(&dmsg);
     158                send(&msg, link);
    140159        }
    141160}
     
    166185        for (size_t i = 0; i < table->size(); i++) {
    167186                route_item* it = (*table)[i];
    168                 ChordMessage msg(ChordMessage::leave, nodeid, it->id);
    169                 send(&msg,it->info);
    170         }
    171 }
    172 
     187                OverlayMsg msg( typeLeave );
     188                send( &msg, it->info );
     189        }
     190}
     191
     192/// @see OverlayInterface.h
    173193const EndpointDescriptor& Chord::resolveNode(const NodeID& node) {
    174194        const route_item* item = table->get(node);
     
    177197}
    178198
    179 void Chord::routeMessage(const NodeID& destnode, Message* msg) {
     199/// @see OverlayInterface.h
     200const LinkID& Chord::getNextLinkId( const NodeID& id ) const {
    180201        // get next hop
     202<<<<<<< .working
    181203        const route_item* item = table->get_next_hop(destnode);
    182 
     204=======
     205        const route_item* item = table->get_next_hop(id);
     206>>>>>>> .merge-rechts.r5869
     207
     208<<<<<<< .working
    183209        // message for this node? yes-> delegate to base overlay
    184210        if (item->id == nodeid || destnode == nodeid)
    185211                baseoverlay.incomingRouteMessage( msg, LinkID::UNSPECIFIED, nodeid );
    186 
     212=======
     213        // returns a unspecified id when this is itself
     214        if (item == NULL || item->id == nodeid)
     215                return LinkID::UNSPECIFIED;
     216>>>>>>> .merge-rechts.r5869
     217
     218<<<<<<< .working
    187219        else { // no-> send to next hop
    188220                ChordMessage cmsg(ChordMessage::route, nodeid, destnode);
     
    190222                send(&cmsg, item->info);
    191223        }
    192 }
    193 
     224=======
     225        /// return routing info
     226        return item->info;
     227>>>>>>> .merge-rechts.r5869
     228}
     229
     230<<<<<<< .working
    194231/// @see OverlayInterface.h
    195232void Chord::routeMessage(const NodeID& node, const LinkID& link, Message* msg) {
     
    214251}
    215252
     253=======
     254>>>>>>> .merge-rechts.r5869
    216255OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const {
    217256        OverlayInterface::NodeList nodelist;
     
    243282/// @see OverlayInterface.h
    244283void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) {
    245         logging_debug("link_up: link=" << lnk.toString() << " remote=" <<
     284        logging_info("link_up: link=" << lnk.toString() << " remote=" <<
    246285                        remote.toString() );
    247286        for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++)
     
    250289                        break;
    251290                }
     291/*
     292        // check if we already have a connection, yes-> do not handle duplicate!
     293        for (int i=0; i<table->size(); i++)
     294                if ((*table)[i]->id == remote && !((*table)[i]->info.isUnspecified()) && (*table)[i]->info != lnk) {
     295
     296                        return;
     297                }
     298*/
     299        if (remote==nodeid) {
     300                baseoverlay.dropLink(lnk);
     301                return;
     302        }
     303
    252304        route_item* item = table->insert(remote);
    253305
     
    257309                                << " with link " << lnk.toString());
    258310                // replace with new link
    259                 if (!item->info.isUnspecified())
     311                if (!item->info.isUnspecified() || item->info!=lnk)
    260312                        baseoverlay.dropLink(item->info);
    261313                item->info = lnk;
     314                // discover neighbors of new overlay neighbor
     315                discover_neighbors( lnk );
     316                showLinks();
    262317        } else { // no-> add orphan entry to routing table
    263318                logging_info("new orphan: " << remote.toString()
     
    266321        }
    267322
    268         discover_neighbors( lnk );
    269 
     323        // erase bootstrap link
    270324        vector<LinkID>::iterator it = std::find(bootstrapLinks.begin(), bootstrapLinks.end(), lnk);
    271         if( it != bootstrapLinks.end() ) {
    272                 discover_neighbors( lnk );
    273                 bootstrapLinks.erase( it );
    274         }
     325        if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
    275326}
    276327
     
    282333        // remove link from routing table
    283334        route_item* item = table->get(remote);
    284         if (item!=NULL) item->info = LinkID::UNSPECIFIED;
    285         table->remove(remote);
     335        if (item!=NULL && item->info==lnk) {
     336                item->info = LinkID::UNSPECIFIED;
     337                table->remove(remote);
     338        }
    286339}
    287340
     
    292345
    293346        // decode message
    294         typedef ChordMessage M;
    295         M* m = msg.getMessage()->convert<ChordMessage> ();
     347        OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage());
    296348        if (m == NULL) return;
    297349
     
    299351        switch (m->getType()) {
    300352
     353<<<<<<< .working
    301354        // invalid message
    302355        case M::invalid:
     
    324377                // discovery request
    325378        case M::discovery: {
     379=======
     380        // discovery request
     381        case typeDiscovery: {
     382>>>>>>> .merge-rechts.r5869
    326383                // decapsulate message
    327384                Discovery* dmsg = m->decapsulate<Discovery> ();
    328                 logging_debug("received discovery message with"
    329                             << " src=" << m->getSource().toString()
    330                                 << " dst=" << m->getDestination().toString()
     385                logging_debug("Received discovery message with"
     386                            << " src=" << m->getSourceNode().toString()
     387                                << " dst=" << m->getDestinationNode().toString()
    331388                                << " ttl=" << (int)dmsg->getTTL()
    332                                 << " type=" << (int)dmsg->getFollowType()
     389                                << " type=" << (int)dmsg->getType()
    333390                );
    334391
    335392                // check if source node can be added to routing table and setup link
    336                 if (m->getSource() != nodeid && table->is_insertable(m->getSource()))
    337                         setup(*dmsg->getSourceEndpoint(), m->getSource(), dmsg->getSourceRelay() );
     393                if (m->getSourceNode() != nodeid
     394                        && table->is_insertable(m->getSourceNode()))
     395                        setup( dmsg->getEndpoint(), m->getSourceNode() );
    338396
    339397                // delegate discovery message
    340                 switch (dmsg->getFollowType()) {
     398                switch (dmsg->getType()) {
    341399
    342400                // normal: route discovery message like every other message
    343401                case Discovery::normal: {
    344402                        // closest node? yes-> split to follow successor and predecessor
    345                         if (table->is_closest_to(m->getDestination())
    346                                 || (table->get_successor()!=NULL && *table->get_successor() == m->getDestination())
    347                                 || (table->get_predesessor()!=NULL && *table->get_predesessor() == m->getDestination())
    348                         ) {
     403                        if ( table->is_closest_to(m->getDestinationNode()) ) {
    349404
    350405                                if (table->get_successor() != NULL) {
    351                                         // send successor message
    352                                         ChordMessage cmsg_s(*m);
    353                                         Discovery dmsg_s(*dmsg);
    354                                         dmsg_s.setFollowType(Discovery::successor);
    355                                         cmsg_s.encapsulate(&dmsg_s);
     406                                        OverlayMsg omsg(*m);
     407                                        dmsg->setType(Discovery::successor);
     408                                        omsg.encapsulate(dmsg);
    356409                                        route_item* succ_item = table->get(*table->get_successor());
    357410                                        logging_debug("Discovery split: routing discovery message to successor "
    358411                                                        << succ_item->id.toString() );
     412<<<<<<< .working
    359413                                        send(&cmsg_s, succ_item->info);
     414=======
     415                                        send(&omsg, succ_item->info);
     416>>>>>>> .merge-rechts.r5869
    360417                                }
    361418
    362419                                // send predecessor message
    363420                                if (table->get_predesessor() != NULL) {
    364                                         ChordMessage cmsg_p(*m);
    365                                         Discovery dmsg_p(*dmsg);
    366                                         dmsg_p.setFollowType(Discovery::predecessor);
    367                                         cmsg_p.encapsulate(&dmsg_p);
     421                                        OverlayMsg omsg(*m);
     422                                        dmsg->setType(Discovery::predecessor);
     423                                        omsg.encapsulate(dmsg);
    368424                                        route_item* pred_item = table->get(
    369425                                                        *table->get_predesessor());
    370426                                        logging_debug("Discovery split: routing discovery message to predecessor "
    371427                                                        << pred_item->id.toString() );
     428<<<<<<< .working
    372429                                        send(&cmsg_p, pred_item->info);
     430=======
     431                                        send( &omsg, pred_item->info);
     432>>>>>>> .merge-rechts.r5869
    373433                                }
    374434                        }
    375435                        // no-> route message
    376436                        else {
     437<<<<<<< .working
    377438                                // find next hop
    378439                                const route_item* item = table->get_next_hop(m->getDestination());
     
    381442                                                item->id.toString() );
    382443                                send(m, item->info);
     444=======
     445                                baseoverlay.send( m, m->getDestinationNode() );
     446>>>>>>> .merge-rechts.r5869
    383447                        }
    384448                        break;
     
    395459
    396460                        const route_item* item = NULL;
    397                         if (dmsg->getFollowType() == Discovery::successor &&
     461                        if (dmsg->getType() == Discovery::successor &&
    398462                                        table->get_successor() != NULL) {
    399463                                item = table->get(*table->get_successor());
     
    405469                        logging_debug("routing discovery message to succ/pred "
    406470                                << item->id.toString() );
    407                         ChordMessage cmsg(*m);
    408                         Discovery dmsg_p(*dmsg);
    409                         cmsg.encapsulate(&dmsg_p);
    410                         send(&cmsg, item->info);
     471                        OverlayMsg omsg(*m);
     472                        omsg.encapsulate(dmsg);
     473                        omsg.setDestinationNode(item->id);
     474                        baseoverlay.send(&omsg, omsg.getDestinationNode());
    411475                        break;
    412476                }}
     
    415479        }
    416480
    417                 // leave
    418         case M::leave: {
     481        // leave
     482        case typeLeave: {
    419483                if (link!=LinkID::UNSPECIFIED) {
    420484                        route_item* item = table->get(remote);
     
    424488                }
    425489                break;
    426         }
    427         }
    428         delete m;
     490        }}
    429491}
    430492
     
    464526                // remove orphan links
    465527                orphan_removal_counter++;
     528<<<<<<< .working
    466529                if (orphan_removal_counter <0 || orphan_removal_counter >= 4) {
     530=======
     531                if (orphan_removal_counter <0 || orphan_removal_counter >= 2) {
     532>>>>>>> .merge-rechts.r5869
    467533                        logging_info("Running orphan removal");
    468534                        orphan_removal_counter = 0;
     
    473539                                        table->insert(it->id);
    474540                                        if (it->ref_count==0) {
    475                                                 baseoverlay.dropLink(it->info);
     541                                                LinkID id = it->info;
    476542                                                it->info = LinkID::UNSPECIFIED;
     543                                                baseoverlay.dropLink(id);
    477544                                        }
    478545                                }
     
    480547                }
    481548        }
     549}
     550
     551void Chord::showLinks() {
    482552        logging_info("--- chord routing information ----------------------------------");
    483553        logging_info("predecessor: " << (table->get_predesessor()==NULL? "<none>" :
Note: See TracChangeset for help on using the changeset viewer.