Changeset 6266


Ignore:
Timestamp:
Sep 25, 2009, 2:30:33 PM (15 years ago)
Author:
mies
Message:

added basic DHT functionality (untested)

Location:
source/ariba
Files:
2 added
13 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/CommunicationListener.cpp

    r3374 r6266  
    6262
    6363bool CommunicationListener::onLinkRequest(const NodeID& remote,
    64                 const DataMessage& msg) {
     64        const DataMessage& msg) {
    6565        return true;
    6666}
    6767
    6868void CommunicationListener::onMessage(const DataMessage& msg,
    69                 const NodeID& remote, const LinkID& lnk) {
     69        const NodeID& remote, const LinkID& lnk) {
    7070}
    7171
     
    7373        return true;
    7474}
     75
     76void CommunicationListener::onKeyValue( const Data& key, const vector<Data>& value ) {
     77
     78}
     79
    7580
    7681// --- extended message functionality ---
  • source/ariba/CommunicationListener.h

    r4075 r6266  
    8686
    8787        // --- sniffing related method ---
     88
    8889        virtual bool onEnableSideportListener();
    8990
     91        // --- dht functionality ---
     92
     93        virtual void onKeyValue( const Data& key, const vector<Data>& value );
     94
    9095        // --- extended message functionality ---
     96
    9197        // virtual void onLinkQoSChanged(const LinkID& lnk, const NodeID& remote,
    9298        //                      const LinkProperties& prop);
    9399
    94100        // --- extended message functionality ---
     101
    95102        //      virtual void onMessageSent(seqnum_t seq_num, bool failed,
    96103        //              const DataMessage& msg = DataMessage::UNSPECIFIED);
    97 
    98         // --- dht functionality ---
    99         //      virtual void onGetResponse( const Identifier<> id, const Message* msg );
    100         //      virtual void onPutResponse( const Identifier<> id, const Message* msg );
    101104};
    102105
  • source/ariba/Makefile.am

    r5993 r6266  
    144144  overlay/messages/JoinReply.cpp \
    145145  overlay/messages/JoinRequest.cpp \
     146  overlay/messages/DHTMessage.cpp \
    146147  overlay/messages/OverlayMsg.cpp
    147148
     
    149150  overlay/messages/JoinReply.h \
    150151  overlay/messages/JoinRequest.h \
     152  overlay/messages/DHTMessage.h\
    151153  overlay/messages/OverlayMsg.h
    152154
  • source/ariba/Node.cpp

    r5412 r6266  
    176176
    177177// service directory
    178 /*
    179  void Node::put(const Identifier<>& key, Message* value) {
    180  }
    181 
    182  void Node::get(const Identifier<>& key) {
    183 
    184  }
    185  */
     178
     179void Node::put( const Data& key, const Data& value, uint16_t ttl ) {
     180        base_overlay->dhtPut(key,value,ttl);
     181}
     182
     183void Node::get( const Data& key, const ServiceID& sid ) {
     184        base_overlay->dhtGet(key,sid);
     185}
    186186
    187187// @see Module.h
  • source/ariba/Node.h

    r3578 r6266  
    282282        bool unbind(CommunicationListener* listener, const ServiceID& sid);
    283283
    284         // --- extension proposal: service directory --
    285         // main-idea: use this methods to register groups/rendevous points inside
    286         // the base overlay via a distributed storage service.
    287         //
    288         //void put(const KeyID& key, Message* value);
    289         //void get(const KeyID& key);
     284        /**
     285         * Adds a key value pair to the DHT
     286         *
     287         * @param key The key data
     288         * @param value The value data
     289         * @param ttl The time to live in seconds
     290         */
     291        void put( const Data& key, const Data& value, uint16_t ttl );
     292
     293        /**
     294         * Queries for values stored in the DHT. Fires an communication event when
     295         * values arrive.
     296         *
     297         * @param key The key data
     298         * @param sid The service that is requesting the values
     299         */
     300        void get( const Data& key, const ServiceID& sid );
     301
     302
    290303        //-------------------------------------------------------------------------
    291304        //
  • source/ariba/overlay/BaseOverlay.cpp

    r6202 r6266  
    5151
    5252#include "ariba/overlay/messages/OverlayMsg.h"
     53#include "ariba/overlay/messages/DHTMessage.h"
    5354#include "ariba/overlay/messages/JoinRequest.h"
    5455#include "ariba/overlay/messages/JoinReply.h"
     
    5859namespace ariba {
    5960namespace overlay {
     61
     62class ValueEntry {
     63public:
     64        ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),
     65                last_change(time(NULL)), value(value.clone()) {
     66        }
     67
     68        ~ValueEntry()  {
     69                value.release();
     70        }
     71
     72        void refresh() {
     73                last_update = time(NULL);
     74        }
     75
     76        void set_value( const Data& value ) {
     77                this->value.release();
     78                this->value = value.clone();
     79                this->last_change = time(NULL);
     80                this->last_update = time(NULL);
     81        }
     82
     83        Data get_value() const {
     84                return value;
     85        }
     86
     87        uint16_t get_ttl() const {
     88                return ttl;
     89        }
     90
     91        void set_ttl( uint16_t ttl ) {
     92                this->ttl = ttl;
     93        }
     94
     95        bool is_ttl_elapsed() const {
     96                // is persistent? yes-> always return false
     97                if (ttl==0) return false;
     98
     99                // return true, if ttl is elapsed
     100                return ( difftime( time(NULL), this->last_update ) >= ttl );
     101        }
     102
     103private:
     104        uint16_t ttl;
     105        time_t last_update;
     106        time_t last_change;
     107        Data value;
     108};
     109
     110class DHTEntry {
     111public:
     112        Data key;
     113        vector<ValueEntry> values;
     114
     115        vector<Data> get_values() {
     116                vector<Data> vect;
     117                BOOST_FOREACH( ValueEntry& e, values )
     118                        vect.push_back( e.get_value() );
     119                return vect;
     120        }
     121};
     122
     123class DHT {
     124public:
     125        typedef vector<DHTEntry> Entries;
     126        typedef vector<ValueEntry> Values;
     127        Entries entries;
     128
     129        static bool equals( const Data& lhs, const Data& rhs ) {
     130                if (rhs.getLength()!=lhs.getLength()) return false;
     131                for (int i=0; i<lhs.getLength()/8; i++)
     132                        if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;
     133                return true;
     134        }
     135
     136        void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {
     137
     138                // find entry
     139                for (size_t i=0; i<entries.size(); i++) {
     140                        DHTEntry& entry = entries.at(i);
     141
     142                        // check if key is already known
     143                        if ( equals(entry.key, key) ) {
     144
     145                                // check if value is already in values list
     146                                for (size_t j=0; j<entry.values.size(); j++) {
     147                                        // found value already? yes-> refresh ttl
     148                                        if ( equals(entry.values[j].get_value(), value) ) {
     149                                                entry.values[j].refresh();
     150                                                std::cout << "DHT: Republished value. Refreshing TTL."
     151                                                        << std::endl;
     152                                                return;
     153                                        }
     154                                }
     155
     156                                // new value-> add to entry
     157                                std::cout << "DHT: Added value to "
     158                                        << " key=" << key << " with value=" << value << std::endl;
     159                                entry.values.push_back( ValueEntry( value ) );
     160                                entry.values.back().set_ttl(ttl);
     161                                return;
     162                        }
     163                }
     164
     165                // key is unknown-> add key value pair
     166                std::cout << "DHT: New key value pair "
     167                        << " key=" << key << " with value=" << value << std::endl;
     168
     169                // add new entry
     170                entries.push_back( DHTEntry() );
     171                DHTEntry& entry = entries.back();
     172                entry.key = key.clone();
     173                entry.values.push_back( ValueEntry(value) );
     174                entry.values.back().set_ttl(ttl);
     175        }
     176
     177        vector<Data> get( const Data& key ) {
     178                // find entry
     179                for (size_t i=0; i<entries.size(); i++) {
     180                        DHTEntry& entry = entries.at(i);
     181                        if ( equals(entry.key,key) )
     182                                return entry.get_values();
     183                }
     184                return vector<Data>();
     185        }
     186
     187        bool remove( const Data& key ) {
     188                // find entry
     189                for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
     190                        DHTEntry& entry = *i;
     191
     192                        // found? yes-> delete entry
     193                        if ( equals(entry.key, key) ) {
     194                                i = entries.erase(i);
     195                                return true;
     196                        }
     197                }
     198                return false;
     199        }
     200
     201        bool remove( const Data& key, const Data& value ) {
     202                // find entry
     203                for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
     204                        DHTEntry& entry = *i;
     205
     206                        // found? yes-> try to find value
     207                        if ( equals(entry.key, key) ) {
     208                                for (Values::iterator j = entry.values.begin();
     209                                                j != entry.values.end(); j++) {
     210
     211                                        // value found? yes-> delete
     212                                        if (equals(j->get_value(), value)) {
     213                                                j = entry.values.erase(j);
     214                                                return true;
     215                                        }
     216                                }
     217                        }
     218                }
     219                return false;
     220        }
     221
     222        void cleanup() {
     223                // find entry
     224                for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
     225                        DHTEntry& entry = *i;
     226
     227                        for (Values::iterator j = entry.values.begin();
     228                                        j != entry.values.end(); j++) {
     229
     230                                // value found? yes-> delete
     231                                if (j->is_ttl_elapsed())
     232                                        j = entry.values.erase(j);
     233                        }
     234
     235                        if (entry.values.size()==0) i = entries.erase(i);
     236                }
     237        }
     238};
     239
     240// ----------------------------------------------------------------------------
    60241
    61242/* *****************************************************************************
     
    545726        spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
    546727        sideport(&SideportListener::DEFAULT), started(false), counter(0) {
     728        dht = new DHT();
    547729}
    548730
    549731BaseOverlay::~BaseOverlay() {
     732        delete dht;
    550733}
    551734
     
    15821765        overlayMsg->addRouteRecord(nodeId);
    15831766
     1767        // handle dht messages (do not route)
     1768        if (overlayMsg->isDHTMessage())
     1769                return handleDHTMessage(overlayMsg);
     1770
    15841771        // handle signaling messages (do not route!)
    15851772        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
     
    16011788                return true;
    16021789        }
     1790
     1791        // handle DHT response messages
     1792        if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {
     1793                bool ret = handleDHTMessage(overlayMsg);
     1794                delete overlayMsg;
     1795                return ret;
     1796        }
     1797
    16031798
    16041799        // handle base overlay message
     
    16991894}
    17001895
     1896
     1897// ----------------------------------------------------------------------------
     1898
     1899
     1900
     1901/// stabilize DHT state
     1902void BaseOverlay::stabilizeDHT() {
     1903
     1904}
     1905
     1906// handle DHT messages
     1907bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {
     1908
     1909        // decapsulate message
     1910        logging_debug("received DHT message");
     1911        DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();
     1912
     1913        // handle DHT data message
     1914        if (msg->getType()==OverlayMsg::typeDHTData) {
     1915                const ServiceID& service = msg->getService();
     1916                logging_debug( "Received DHT data for service " << service.toString() );
     1917
     1918                // delegate data message
     1919                getListener(service)->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );
     1920                return true;
     1921        }
     1922
     1923        // route message to closest node
     1924        if (!overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
     1925                logging_debug("Routing DHT message to closest node "
     1926                        << " from " << msg->getSourceNode()
     1927                        << " to " << msg->getDestinationNode()
     1928                );
     1929                route( msg );
     1930                delete msg;
     1931                return true;
     1932        }
     1933
     1934        // now, we are the closest node...
     1935        switch (msg->getType()) {
     1936        case OverlayMsg::typeDHTPut: {
     1937                BOOST_FOREACH( Data value, dhtMsg->getValues() )
     1938                        dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );
     1939                break;
     1940        }
     1941
     1942        case OverlayMsg::typeDHTGet: {
     1943                vector<Data> vect = dht->get(dhtMsg->getKey());
     1944                OverlayMsg omsg(*msg);
     1945                omsg.swapRoles();
     1946                omsg.setType(OverlayMsg::typeDHTData);
     1947                DHTMessage dhtmsg(dhtMsg->getKey(), vect);
     1948                omsg.encapsulate(&dhtmsg);
     1949                this->send(&omsg, omsg.getDestinationNode());
     1950                break;
     1951        }
     1952
     1953        case OverlayMsg::typeDHTRemove: {
     1954                if (dhtMsg->hasValues()) {
     1955                        BOOST_FOREACH( Data value, dhtMsg->getValues() )
     1956                                        dht->remove(dhtMsg->getKey(), value );
     1957                } else
     1958                        dht->remove( dhtMsg->getKey() );
     1959                break;
     1960        }
     1961
     1962        default:
     1963                logging_error("DHT Message type unknown.");
     1964                return false;
     1965        }
     1966        delete msg;
     1967        return true;
     1968}
     1969
     1970/// put a value to the DHT with a ttl given in seconds
     1971void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl ) {
     1972        // calculate hash
     1973        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
     1974        OverlayMsg msg(OverlayMsg::typeDHTPut);
     1975        DHTMessage dhtmsg(key,value);
     1976        dhtmsg.setTTL(ttl);
     1977        msg.setDestinationNode(dest);
     1978        msg.encapsulate( &dhtmsg );
     1979        send(&msg, dest);
     1980}
     1981
     1982/// removes a key value pair from the DHT
     1983void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {
     1984        // calculate hash
     1985        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
     1986        OverlayMsg msg(OverlayMsg::typeDHTRemove);
     1987        DHTMessage dhtmsg(key,value);
     1988        msg.setDestinationNode(dest);
     1989        msg.encapsulate( &dhtmsg );
     1990        send(&msg, dest);
     1991}
     1992
     1993/// removes all data stored at the given key
     1994void BaseOverlay::dhtRemove( const Data& key ) {
     1995        // calculate hash
     1996        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
     1997        OverlayMsg msg(OverlayMsg::typeDHTRemove);
     1998        DHTMessage dhtmsg(key);
     1999        msg.setDestinationNode(dest);
     2000        msg.encapsulate( &dhtmsg );
     2001        send(&msg, dest);
     2002}
     2003
     2004/// requests data stored using key
     2005void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {
     2006        // calculate hash
     2007        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
     2008        OverlayMsg msg(OverlayMsg::typeDHTRemove);
     2009        DHTMessage dhtmsg(key);
     2010        msg.setDestinationNode(dest);
     2011        msg.setService(service);
     2012        msg.encapsulate( &dhtmsg );
     2013        send(&msg, dest);
     2014}
     2015
     2016
    17012017}} // namespace ariba, overlay
  • source/ariba/overlay/BaseOverlay.h

    r5916 r6266  
    122122class LinkDescriptor;
    123123class OverlayMsg;
     124class DHT;
    124125
    125126class BaseOverlay: public MessageReceiver,
     
    284285        void leaveSpoVNet();
    285286
     287        /// put a value to the DHT with a ttl given in seconds
     288        void dhtPut( const Data& key, const Data& value, int ttl = 0);
     289
     290        /// removes a key value pair from the DHT
     291        void dhtRemove( const Data& key, const Data& value );
     292
     293        /// removes all data stored at the given key
     294        void dhtRemove( const Data& key );
     295
     296        /// requests data stored using key
     297        void dhtGet( const Data& key, const ServiceID& service );
     298
    286299protected:
    287 
    288300        /**
    289301         * @see ariba::communication::CommunicationEvents.h
     
    397409        bool handleJoinReply( OverlayMsg* msg, const LinkID& bcLink );
    398410
     411        // handle DHT messages
     412        bool handleDHTMessage( OverlayMsg* msg );
     413
    399414        // handle link messages
    400415        bool handleLinkRequest( OverlayMsg* msg, LinkDescriptor* ld );
     
    403418        bool handleLinkDirect( OverlayMsg* msg, LinkDescriptor* ld );
    404419        bool handleLinkAlive( OverlayMsg* msg, LinkDescriptor* ld );
     420
    405421
    406422        // link state handling -----------------------------------------------------
     
    487503                bool ignore_down = false );
    488504
     505        // distributed hashtable handling ------------------------------------------
     506
     507        DHT* dht;
     508
     509        void stabilizeDHT();
     510
    489511        // misc --------------------------------------------------------------------
    490512
  • source/ariba/overlay/messages/OverlayMsg.h

    r5902 r6266  
    8888                typeLinkDirect  = 0x34, ///< direct connection has been established
    8989                typeLinkAlive   = 0x35, ///< keep-alive message
     90
     91                // DHT routed messages
     92                maskDHT                 = 0x40, ///< bit mask for dht messages
     93                typeDHTPut      = 0x41, ///< DHT put operation
     94                typeDHTGet      = 0x42, ///< DHT get operation
     95                typeDHTRemove   = 0x43, ///< DHT remove operation
     96
     97                /// DHT response messages
     98                maskDHTResponse = 0x50, ///< bit mask for dht responses
     99                typeDHTData     = 0x51, ///< DHT get data
    90100
    91101                // topology signaling
     
    179189        }
    180190
    181 
    182191        bool containsSourceEndpoint() const {
    183192                return (flags & 0x20)!=0;
     
    186195        void setContainsSourceEndpoint(bool contains_endpoint) {
    187196                if (contains_endpoint) flags |= 0x20; else flags &= ~0x20;
     197        }
     198
     199        bool isDHTMessage() const {
     200                return hasTypeMask(maskDHT);
    188201        }
    189202
  • source/ariba/overlay/modules/OverlayInterface.h

    r5870 r6266  
    116116        virtual const EndpointDescriptor& resolveNode(const NodeID& node) = 0;
    117117
     118
     119        /**
     120         * Returns true if this is the closest node to the given node
     121         * identifier.
     122         *
     123         * @param node The node identifier to compare with
     124         * @return True if this is the closest node to the given node identifier
     125         */
     126        virtual bool isClosestNodeTo( const NodeID& node ) = 0;
     127
    118128        /**
    119129         * Returns the nodes known to this overlay.
  • source/ariba/overlay/modules/chord/Chord.cpp

    r6198 r6266  
    181181        if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
    182182        return baseoverlay.getEndpointDescriptor(item->info);
     183}
     184
     185/// @see OverlayInterface.h
     186bool Chord::isClosestNodeTo( const NodeID& node ) {
     187        return table->is_closest_to(node);
    183188}
    184189
  • source/ariba/overlay/modules/chord/Chord.h

    r5876 r6266  
    123123
    124124        /// @see OverlayInterface.h
     125        virtual bool isClosestNodeTo( const NodeID& node );
     126
     127        /// @see OverlayInterface.h
    125128        virtual NodeList getKnownNodes(bool deep = true) const;
    126129
  • source/ariba/overlay/modules/onehop/OneHop.cpp

    r5743 r6266  
    7979}
    8080
     81
     82/// @see OverlayInterface.h
     83bool OneHop::isClosestNodeTo( const NodeID& node ) {
     84        throw "NOT IMPLEMENTED!";
     85        return false;
     86}
     87
    8188void OneHop::routeMessage(const NodeID& destnode, Message* msg){
    8289
  • source/ariba/overlay/modules/onehop/OneHop.h

    r5624 r6266  
    8181
    8282        /// @see OverlayInterface.h
     83        virtual bool isClosestNodeTo( const NodeID& node );
     84
     85        /// @see OverlayInterface.h
    8386        virtual const LinkID& getNextLinkId( const NodeID& id ) const;
    8487
Note: See TracChangeset for help on using the changeset viewer.