Ignore:
Timestamp:
Sep 25, 2009, 2:30:33 PM (15 years ago)
Author:
mies
Message:

added basic DHT functionality (untested)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/overlay/BaseOverlay.cpp

    r6202 r6266  
    5151
    5252#include "ariba/overlay/messages/OverlayMsg.h"
     53#include "ariba/overlay/messages/DHTMessage.h"
    5354#include "ariba/overlay/messages/JoinRequest.h"
    5455#include "ariba/overlay/messages/JoinReply.h"
     
    5859namespace ariba {
    5960namespace overlay {
     61
     62class ValueEntry {
     63public:
     64        ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),
     65                last_change(time(NULL)), value(value.clone()) {
     66        }
     67
     68        ~ValueEntry()  {
     69                value.release();
     70        }
     71
     72        void refresh() {
     73                last_update = time(NULL);
     74        }
     75
     76        void set_value( const Data& value ) {
     77                this->value.release();
     78                this->value = value.clone();
     79                this->last_change = time(NULL);
     80                this->last_update = time(NULL);
     81        }
     82
     83        Data get_value() const {
     84                return value;
     85        }
     86
     87        uint16_t get_ttl() const {
     88                return ttl;
     89        }
     90
     91        void set_ttl( uint16_t ttl ) {
     92                this->ttl = ttl;
     93        }
     94
     95        bool is_ttl_elapsed() const {
     96                // is persistent? yes-> always return false
     97                if (ttl==0) return false;
     98
     99                // return true, if ttl is elapsed
     100                return ( difftime( time(NULL), this->last_update ) >= ttl );
     101        }
     102
     103private:
     104        uint16_t ttl;
     105        time_t last_update;
     106        time_t last_change;
     107        Data value;
     108};
     109
     110class DHTEntry {
     111public:
     112        Data key;
     113        vector<ValueEntry> values;
     114
     115        vector<Data> get_values() {
     116                vector<Data> vect;
     117                BOOST_FOREACH( ValueEntry& e, values )
     118                        vect.push_back( e.get_value() );
     119                return vect;
     120        }
     121};
     122
     123class DHT {
     124public:
     125        typedef vector<DHTEntry> Entries;
     126        typedef vector<ValueEntry> Values;
     127        Entries entries;
     128
     129        static bool equals( const Data& lhs, const Data& rhs ) {
     130                if (rhs.getLength()!=lhs.getLength()) return false;
     131                for (int i=0; i<lhs.getLength()/8; i++)
     132                        if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;
     133                return true;
     134        }
     135
     136        void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {
     137
     138                // find entry
     139                for (size_t i=0; i<entries.size(); i++) {
     140                        DHTEntry& entry = entries.at(i);
     141
     142                        // check if key is already known
     143                        if ( equals(entry.key, key) ) {
     144
     145                                // check if value is already in values list
     146                                for (size_t j=0; j<entry.values.size(); j++) {
     147                                        // found value already? yes-> refresh ttl
     148                                        if ( equals(entry.values[j].get_value(), value) ) {
     149                                                entry.values[j].refresh();
     150                                                std::cout << "DHT: Republished value. Refreshing TTL."
     151                                                        << std::endl;
     152                                                return;
     153                                        }
     154                                }
     155
     156                                // new value-> add to entry
     157                                std::cout << "DHT: Added value to "
     158                                        << " key=" << key << " with value=" << value << std::endl;
     159                                entry.values.push_back( ValueEntry( value ) );
     160                                entry.values.back().set_ttl(ttl);
     161                                return;
     162                        }
     163                }
     164
     165                // key is unknown-> add key value pair
     166                std::cout << "DHT: New key value pair "
     167                        << " key=" << key << " with value=" << value << std::endl;
     168
     169                // add new entry
     170                entries.push_back( DHTEntry() );
     171                DHTEntry& entry = entries.back();
     172                entry.key = key.clone();
     173                entry.values.push_back( ValueEntry(value) );
     174                entry.values.back().set_ttl(ttl);
     175        }
     176
     177        vector<Data> get( const Data& key ) {
     178                // find entry
     179                for (size_t i=0; i<entries.size(); i++) {
     180                        DHTEntry& entry = entries.at(i);
     181                        if ( equals(entry.key,key) )
     182                                return entry.get_values();
     183                }
     184                return vector<Data>();
     185        }
     186
     187        bool remove( const Data& key ) {
     188                // find entry
     189                for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
     190                        DHTEntry& entry = *i;
     191
     192                        // found? yes-> delete entry
     193                        if ( equals(entry.key, key) ) {
     194                                i = entries.erase(i);
     195                                return true;
     196                        }
     197                }
     198                return false;
     199        }
     200
     201        bool remove( const Data& key, const Data& value ) {
     202                // find entry
     203                for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
     204                        DHTEntry& entry = *i;
     205
     206                        // found? yes-> try to find value
     207                        if ( equals(entry.key, key) ) {
     208                                for (Values::iterator j = entry.values.begin();
     209                                                j != entry.values.end(); j++) {
     210
     211                                        // value found? yes-> delete
     212                                        if (equals(j->get_value(), value)) {
     213                                                j = entry.values.erase(j);
     214                                                return true;
     215                                        }
     216                                }
     217                        }
     218                }
     219                return false;
     220        }
     221
     222        void cleanup() {
     223                // find entry
     224                for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
     225                        DHTEntry& entry = *i;
     226
     227                        for (Values::iterator j = entry.values.begin();
     228                                        j != entry.values.end(); j++) {
     229
     230                                // value found? yes-> delete
     231                                if (j->is_ttl_elapsed())
     232                                        j = entry.values.erase(j);
     233                        }
     234
     235                        if (entry.values.size()==0) i = entries.erase(i);
     236                }
     237        }
     238};
     239
     240// ----------------------------------------------------------------------------
    60241
    61242/* *****************************************************************************
     
    545726        spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
    546727        sideport(&SideportListener::DEFAULT), started(false), counter(0) {
     728        dht = new DHT();
    547729}
    548730
    549731BaseOverlay::~BaseOverlay() {
     732        delete dht;
    550733}
    551734
     
    15821765        overlayMsg->addRouteRecord(nodeId);
    15831766
     1767        // handle dht messages (do not route)
     1768        if (overlayMsg->isDHTMessage())
     1769                return handleDHTMessage(overlayMsg);
     1770
    15841771        // handle signaling messages (do not route!)
    15851772        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
     
    16011788                return true;
    16021789        }
     1790
     1791        // handle DHT response messages
     1792        if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {
     1793                bool ret = handleDHTMessage(overlayMsg);
     1794                delete overlayMsg;
     1795                return ret;
     1796        }
     1797
    16031798
    16041799        // handle base overlay message
     
    16991894}
    17001895
     1896
     1897// ----------------------------------------------------------------------------
     1898
     1899
     1900
     1901/// stabilize DHT state
     1902void BaseOverlay::stabilizeDHT() {
     1903
     1904}
     1905
     1906// handle DHT messages
     1907bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {
     1908
     1909        // decapsulate message
     1910        logging_debug("received DHT message");
     1911        DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();
     1912
     1913        // handle DHT data message
     1914        if (msg->getType()==OverlayMsg::typeDHTData) {
     1915                const ServiceID& service = msg->getService();
     1916                logging_debug( "Received DHT data for service " << service.toString() );
     1917
     1918                // delegate data message
     1919                getListener(service)->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );
     1920                return true;
     1921        }
     1922
     1923        // route message to closest node
     1924        if (!overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
     1925                logging_debug("Routing DHT message to closest node "
     1926                        << " from " << msg->getSourceNode()
     1927                        << " to " << msg->getDestinationNode()
     1928                );
     1929                route( msg );
     1930                delete msg;
     1931                return true;
     1932        }
     1933
     1934        // now, we are the closest node...
     1935        switch (msg->getType()) {
     1936        case OverlayMsg::typeDHTPut: {
     1937                BOOST_FOREACH( Data value, dhtMsg->getValues() )
     1938                        dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );
     1939                break;
     1940        }
     1941
     1942        case OverlayMsg::typeDHTGet: {
     1943                vector<Data> vect = dht->get(dhtMsg->getKey());
     1944                OverlayMsg omsg(*msg);
     1945                omsg.swapRoles();
     1946                omsg.setType(OverlayMsg::typeDHTData);
     1947                DHTMessage dhtmsg(dhtMsg->getKey(), vect);
     1948                omsg.encapsulate(&dhtmsg);
     1949                this->send(&omsg, omsg.getDestinationNode());
     1950                break;
     1951        }
     1952
     1953        case OverlayMsg::typeDHTRemove: {
     1954                if (dhtMsg->hasValues()) {
     1955                        BOOST_FOREACH( Data value, dhtMsg->getValues() )
     1956                                        dht->remove(dhtMsg->getKey(), value );
     1957                } else
     1958                        dht->remove( dhtMsg->getKey() );
     1959                break;
     1960        }
     1961
     1962        default:
     1963                logging_error("DHT Message type unknown.");
     1964                return false;
     1965        }
     1966        delete msg;
     1967        return true;
     1968}
     1969
     1970/// put a value to the DHT with a ttl given in seconds
     1971void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl ) {
     1972        // calculate hash
     1973        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
     1974        OverlayMsg msg(OverlayMsg::typeDHTPut);
     1975        DHTMessage dhtmsg(key,value);
     1976        dhtmsg.setTTL(ttl);
     1977        msg.setDestinationNode(dest);
     1978        msg.encapsulate( &dhtmsg );
     1979        send(&msg, dest);
     1980}
     1981
     1982/// removes a key value pair from the DHT
     1983void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {
     1984        // calculate hash
     1985        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
     1986        OverlayMsg msg(OverlayMsg::typeDHTRemove);
     1987        DHTMessage dhtmsg(key,value);
     1988        msg.setDestinationNode(dest);
     1989        msg.encapsulate( &dhtmsg );
     1990        send(&msg, dest);
     1991}
     1992
     1993/// removes all data stored at the given key
     1994void BaseOverlay::dhtRemove( const Data& key ) {
     1995        // calculate hash
     1996        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
     1997        OverlayMsg msg(OverlayMsg::typeDHTRemove);
     1998        DHTMessage dhtmsg(key);
     1999        msg.setDestinationNode(dest);
     2000        msg.encapsulate( &dhtmsg );
     2001        send(&msg, dest);
     2002}
     2003
     2004/// requests data stored using key
     2005void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {
     2006        // calculate hash
     2007        NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
     2008        OverlayMsg msg(OverlayMsg::typeDHTRemove);
     2009        DHTMessage dhtmsg(key);
     2010        msg.setDestinationNode(dest);
     2011        msg.setService(service);
     2012        msg.encapsulate( &dhtmsg );
     2013        send(&msg, dest);
     2014}
     2015
     2016
    17012017}} // namespace ariba, overlay
Note: See TracChangeset for help on using the changeset viewer.