Ignore:
Timestamp:
Jul 25, 2012, 11:41:36 AM (12 years ago)
Author:
Michael Tänzer
Message:

Merge the ASIO branch back into trunk

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/overlay/BaseOverlay.cpp

    r10576 r10653  
    5151
    5252#include "ariba/overlay/messages/OverlayMsg.h"
    53 #include "ariba/overlay/messages/DHTMessage.h"
    5453#include "ariba/overlay/messages/JoinRequest.h"
    5554#include "ariba/overlay/messages/JoinReply.h"
     
    6665#define visualIdBase            ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
    6766
    68 class ValueEntry {
    69 public:
    70         ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),
    71                 last_change(time(NULL)), value(value.clone()) {
    72         }
    73 
    74         ValueEntry( const ValueEntry& value ) :
    75                 ttl(value.ttl), last_update(value.last_update),
    76                 last_change(value.last_change), value(value.value.clone()) {
    77         }
    78 
    79         ~ValueEntry()  {
    80                 value.release();
    81         }
    82 
    83         void refresh() {
    84                 last_update = time(NULL);
    85         }
    86 
    87         void set_value( const Data& value ) {
    88                 this->value.release();
    89                 this->value = value.clone();
    90                 this->last_change = time(NULL);
    91                 this->last_update = time(NULL);
    92         }
    93 
    94         Data get_value() const {
    95                 return value;
    96         }
    97 
    98         uint16_t get_ttl() const {
    99                 return ttl;
    100         }
    101 
    102         void set_ttl( uint16_t ttl ) {
    103                 this->ttl = ttl;
    104         }
    105 
    106         bool is_ttl_elapsed() const {
    107                 // is persistent? yes-> always return false
    108                 if (ttl==0) return false;
    109                 // return true, if ttl is elapsed
    110                 return ( difftime( time(NULL), this->last_update ) > ttl );
    111         }
    112 
    113 private:
    114         uint16_t ttl;
    115         time_t last_update;
    116         time_t last_change;
    117         Data value;
    118 };
    119 
    120 class DHTEntry {
    121 public:
    122         Data key;
    123         vector<ValueEntry> values;
    124 
    125         vector<Data> get_values() {
    126                 vector<Data> vect;
    127                 BOOST_FOREACH( ValueEntry& e, values )
    128                         vect.push_back( e.get_value() );
    129                 return vect;
    130         }
    131 
    132         void erase_expired_entries() {
    133                 for (vector<ValueEntry>::iterator i = values.begin();
    134                                 i != values.end(); i++ )
    135                         if (i->is_ttl_elapsed())
    136                                 i = values.erase(i)-1;
    137         }
    138 };
    139 
    140 class DHT {
    141 public:
    142         typedef vector<DHTEntry> Entries;
    143         typedef vector<ValueEntry> Values;
    144         Entries entries;
    145         static const bool verbose = false;
    146 
    147         static bool equals( const Data& lhs, const Data& rhs ) {
    148                 if (rhs.getLength()!=lhs.getLength()) return false;
    149                 for (size_t i=0; i<lhs.getLength()/8; i++)
    150                         if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;
    151                 return true;
    152         }
    153 
    154         void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {
    155                 cleanup();
    156 
    157                 // find entry
    158                 for (size_t i=0; i<entries.size(); i++) {
    159                         DHTEntry& entry = entries.at(i);
    160 
    161                         // check if key is already known
    162                         if ( equals(entry.key, key) ) {
    163 
    164                                 // check if value is already in values list
    165                                 for (size_t j=0; j<entry.values.size(); j++) {
    166                                         // found value already? yes-> refresh ttl
    167                                         if ( equals(entry.values[j].get_value(), value) ) {
    168                                                 entry.values[j].refresh();
    169                                                 if (verbose)
    170                                                         std::cout << "DHT: Republished value. Refreshing value timestamp."
    171                                                                 << std::endl;
    172                                                 return;
    173                                         }
    174                                 }
    175 
    176                                 // new value-> add to entry
    177                                 if (verbose)
    178                                         std::cout << "DHT: Added value to "
    179                                                 << " key=" << key << " with value=" << value << std::endl;
    180                                 entry.values.push_back( ValueEntry( value ) );
    181                                 entry.values.back().set_ttl(ttl);
    182                                 return;
    183                         }
    184                 }
    185 
    186                 // key is unknown-> add key value pair
    187                 if (verbose)
    188                         std::cout << "DHT: New key value pair "
    189                                 << " key=" << key << " with value=" << value << std::endl;
    190 
    191                 // add new entry
    192                 entries.push_back( DHTEntry() );
    193                 DHTEntry& entry = entries.back();
    194                 entry.key = key.clone();
    195                 entry.values.push_back( ValueEntry(value) );
    196                 entry.values.back().set_ttl(ttl);
    197         }
    198 
    199         vector<Data> get( const Data& key ) {
    200                 cleanup();
    201                 // find entry
    202                 for (size_t i=0; i<entries.size(); i++) {
    203                         DHTEntry& entry = entries.at(i);
    204                         if ( equals(entry.key,key) )
    205                                 return entry.get_values();
    206                 }
    207                 return vector<Data>();
    208         }
    209 
    210         bool remove( const Data& key ) {
    211                 cleanup();
    212 
    213                 // find entry
    214                 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
    215                         DHTEntry& entry = *i;
    216 
    217                         // found? yes-> delete entry
    218                         if ( equals(entry.key, key) ) {
    219                                 entries.erase(i);
    220                                 return true;
    221                         }
    222                 }
    223                 return false;
    224         }
    225 
    226         bool remove( const Data& key, const Data& value ) {
    227                 cleanup();
    228                 // find entry
    229                 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
    230                         DHTEntry& entry = *i;
    231 
    232                         // found? yes-> try to find value
    233                         if ( equals(entry.key, key) ) {
    234                                 for (Values::iterator j = entry.values.begin();
    235                                                 j != entry.values.end(); j++) {
    236 
    237                                         // value found? yes-> delete
    238                                         if (equals(j->get_value(), value)) {
    239                                                 j = entry.values.erase(j)-1;
    240                                                 return true;
    241                                         }
    242                                 }
    243                         }
    244                 }
    245                 return false;
    246         }
    247 
    248         void cleanup() {
    249                 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
    250                         DHTEntry& entry = *i;
    251                         entry.erase_expired_entries();
    252                         if (entry.values.size()==0)
    253                                 i = entries.erase(i)-1;
    254                 }
    255         }
    256 };
    25767
    25868// ----------------------------------------------------------------------------
     
    758568                        sideport(&SideportListener::DEFAULT), overlayInterface(NULL),
    759569                        counter(0) {
    760         initDHT();
    761570}
    762571
    763572BaseOverlay::~BaseOverlay() {
    764         destroyDHT();
    765573}
    766574
     
    1078886}
    1079887
     888
    1080889seqnum_t BaseOverlay::sendMessage(const Message* message,
    1081890                const NodeID& node, const ServiceID& service) {
     
    1114923}
    1115924
     925
     926NodeID BaseOverlay::sendMessageCloserToNodeID(const Message* message,
     927        const NodeID& address, const ServiceID& service) {
     928   
     929    if ( overlayInterface->isClosestNodeTo(address) )
     930    {
     931        return NodeID::UNSPECIFIED;
     932    }
     933       
     934    const NodeID& closest_node = overlayInterface->getNextNodeId(address);
     935   
     936    if ( closest_node != NodeID::UNSPECIFIED )
     937    {
     938        seqnum_t seqnum = sendMessage(message, closest_node, service);
     939    }
     940   
     941    return closest_node;  // XXX return seqnum ?? tuple? closest_node via (non const) reference?
     942}
    1116943// ----------------------------------------------------------------------------
    1117944
     
    18311658        overlayMsg->addRouteRecord(nodeId);
    18321659
    1833         // handle dht messages (do not route)
    1834         if (overlayMsg->isDHTMessage()) {
    1835                 bool ret = handleDHTMessage(overlayMsg);
    1836                 delete overlayMsg;
    1837                 return ret;
    1838         }
    1839 
    18401660        // handle signaling messages (do not route!)
    18411661        if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
     
    18561676                delete overlayMsg;
    18571677                return true;
    1858         }
    1859 
    1860         // handle DHT response messages
    1861         if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {
    1862                 bool ret = handleDHTMessage(overlayMsg);
    1863                 delete overlayMsg;
    1864                 return ret;
    18651678        }
    18661679
     
    19641777        stabilizeRelays();
    19651778        stabilizeLinks();
    1966         stabilizeDHT();
    19671779        updateVisual();
    19681780}
     
    21101922// ----------------------------------------------------------------------------
    21111923
    2112 void BaseOverlay::initDHT() {
    2113         dht = new DHT();
    2114         localDHT = new DHT();
    2115         republishCounter = 0;
    2116 }
    2117 
    2118 void BaseOverlay::destroyDHT() {
    2119         delete dht;
    2120         delete localDHT;
    2121 }
    2122 
    2123 /// stabilize DHT state
    2124 void BaseOverlay::stabilizeDHT() {
    2125 
    2126         // do refresh every 2 seconds
    2127         if (republishCounter < 2) {
    2128                 republishCounter++;
    2129                 return;
    2130         }
    2131         republishCounter = 0;
    2132 
    2133         // remove old values from DHT
    2134         BOOST_FOREACH( DHTEntry& entry, dht->entries ) {
    2135                 // erase old entries
    2136                 entry.erase_expired_entries();
    2137         }
    2138 
    2139         // re-publish values-> do not refresh locally stored values
    2140         BOOST_FOREACH( DHTEntry& entry, localDHT->entries ) {
    2141                 BOOST_FOREACH( ValueEntry& value, entry.values )
    2142                         dhtPut(entry.key, value.get_value(), value.get_ttl(), false, true );
    2143         }
    2144 }
    2145 
    2146 // handle DHT messages
    2147 bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {
    2148 
    2149         // de-capsulate message
    2150         logging_debug("Received DHT message");
    2151         DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();
    2152 
    2153         // handle DHT data message
    2154         if (msg->getType()==OverlayMsg::typeDHTData) {
    2155                 const ServiceID& service = msg->getService();
    2156                 logging_info( "Received DHT data for service " << service.toString() );
    2157 
    2158                 // delegate data message
    2159                 CommunicationListener* lst = getListener(service);
    2160                 if(lst != NULL) lst->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );
    2161                 delete dhtMsg;
    2162                 return true;
    2163         }
    2164 
    2165         // route message to closest node
    2166         if (!overlayInterface->isClosestNodeTo(dhtMsg->getHashedKey())) {
    2167                 logging_debug("Routing DHT message to closest node "
    2168                         << " from " << msg->getSourceNode()
    2169                         << " to " << dhtMsg->getHashedKey()
    2170                 );
    2171                 dhtSend(msg, dhtMsg->getHashedKey());
    2172                 delete dhtMsg;
    2173                 return true;
    2174         }
    2175 
    2176         // now, we are the closest node...
    2177         switch (msg->getType()) {
    2178 
    2179         // ----------------------------------------------------------------- put ---
    2180         case OverlayMsg::typeDHTPut: {
    2181                 logging_debug("DHT-Put: Attempt to store values for key "
    2182                                 << dhtMsg->getKey());
    2183                 if (dhtMsg->doReplace()) {
    2184                         logging_debug("DHT-Put: Attempt to replace key: remove old values first!");
    2185                         dht->remove(dhtMsg->getKey());
    2186                 }
    2187                 BOOST_FOREACH( Data value, dhtMsg->getValues() ) {
    2188                         logging_debug("DHT-Put: Stored value: " << value );
    2189                         dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );
    2190                 }
    2191                 break;
    2192         }
    2193 
    2194         // ----------------------------------------------------------------- get ---
    2195         case OverlayMsg::typeDHTGet: {
    2196                 logging_info("DHT-Get: key=" << dhtMsg->getKey() );
    2197                 vector<Data> vect = dht->get(dhtMsg->getKey());
    2198                 BOOST_FOREACH(const Data& d, vect)
    2199                         logging_info("DHT-Get: value=" << d);
    2200                 OverlayMsg omsg(*msg);
    2201                 omsg.swapRoles();
    2202                 omsg.setType(OverlayMsg::typeDHTData);
    2203                 DHTMessage dhtmsg(dhtMsg->getKey(), vect);
    2204                 omsg.encapsulate(&dhtmsg);
    2205                
    2206                 logging_info("Sending DHT response to " << omsg.getDestinationNode());
    2207                 sendMessage(&omsg, omsg.getDestinationNode());
    2208                 break;
    2209         }
    2210 
    2211         // -------------------------------------------------------------- remove ---
    2212         case OverlayMsg::typeDHTRemove: {
    2213                 if (dhtMsg->hasValues()) {
    2214                         BOOST_FOREACH( Data value, dhtMsg->getValues() )
    2215                                                         dht->remove(dhtMsg->getKey(), value );
    2216                 } else
    2217                         dht->remove( dhtMsg->getKey() );
    2218                 break;
    2219         }
    2220 
    2221         // -------------------------------------------------------------- default---
    2222         default:
    2223                 logging_error("DHT Message type unknown.");
    2224                 return false;
    2225         }
    2226         delete dhtMsg;
    2227         return true;
    2228 }
    2229 
    2230 /// put a value to the DHT with a ttl given in seconds
    2231 void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl, bool replace, bool no_local_refresh ) {
    2232 
    2233         // log
    2234         logging_info("DHT-Put:"
    2235                 << " key=" << key << " value=" << value
    2236                 << " ttl=" << ttl << " replace=" << replace
    2237         );
    2238 
    2239         if (!no_local_refresh) {
    2240 
    2241                 // put into local data store (for refreshes)
    2242                 if (replace) localDHT->remove(key);
    2243                 localDHT->put(key, value, ttl);
    2244         }
    2245 
    2246         DHTMessage dhtmsg( key, value );
    2247         dhtmsg.setReplace( replace );
    2248         dhtmsg.setTTL(ttl);
    2249 
    2250         OverlayMsg msg( OverlayMsg::typeDHTPut );
    2251         msg.encapsulate( &dhtmsg );
    2252         msg.setSourceNode(this->nodeId);
    2253         dhtSend(&msg, dhtmsg.getHashedKey());
    2254 }
    2255 
    2256 /// removes a key value pair from the DHT
    2257 void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {
    2258         // remove from local data store
    2259         localDHT->remove(key,value);
    2260 
    2261         DHTMessage dhtmsg(key,value);
    2262 
    2263         // send message
    2264         OverlayMsg msg(OverlayMsg::typeDHTRemove);
    2265         msg.encapsulate( &dhtmsg );
    2266         msg.setSourceNode(this->nodeId);
    2267         dhtSend(&msg, dhtmsg.getHashedKey());
    2268 }
    2269 
    2270 /// removes all data stored at the given key
    2271 void BaseOverlay::dhtRemove( const Data& key ) {
    2272         // log: remove key
    2273         logging_info("DHT-Remove: Removing key=" << key );
    2274 
    2275         DHTMessage dhtmsg(key);
    2276 
    2277         // send message
    2278         OverlayMsg msg(OverlayMsg::typeDHTRemove);
    2279         msg.encapsulate( &dhtmsg );
    2280         msg.setSourceNode(this->nodeId);
    2281         dhtSend(&msg, dhtmsg.getHashedKey());
    2282 }
    2283 
    2284 /// requests data stored using key
    2285 void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {
    2286         // log: get
    2287         logging_info("DHT-Get: Trying to resolve key=" <<
    2288                         key << " for service=" << service.toString() );
    2289 
    2290         DHTMessage dhtmsg(key);
    2291 
    2292         // send message
    2293         OverlayMsg msg(OverlayMsg::typeDHTGet);
    2294         msg.setService(service);
    2295         msg.encapsulate( &dhtmsg );
    2296         msg.setSourceNode(this->nodeId);
    2297         dhtSend(&msg, dhtmsg.getHashedKey());
    2298 }
    2299 
    2300 void BaseOverlay::dhtSend( OverlayMsg* msg, const NodeID& dest ) {
    2301         // log: dht send
    2302         logging_info("DHT-Send: Sending message with key=" << dest.toString() );
    2303 
    2304         // local storage? yes-> put into DHT directly
    2305         if (overlayInterface->isClosestNodeTo(dest)) {
    2306                 // be compatible with old code so set destination to hashed key
    2307                 msg->setDestinationNode(dest);
    2308                
    2309                 Data d = data_serialize(msg);
    2310                 Message m2(d);
    2311                 OverlayMsg* m3 = m2.decapsulate<OverlayMsg>();
    2312                
    2313                 handleDHTMessage(m3);
    2314                
    2315                 delete m3;
    2316                 return;
    2317         } else {
    2318                 // need to route
    2319                 NodeID next_hop = overlayInterface->getNextNodeId(dest);
    2320                 msg->setDestinationNode(next_hop);
    2321                 logging_info("DHT-Send: sending via node " << next_hop.toString());
    2322                
    2323                 send(msg, next_hop);
    2324                
    2325                 return;
    2326         }
    2327 }
    2328 
    23291924std::string BaseOverlay::debugInformation() {
    23301925        std::stringstream s;
Note: See TracChangeset for help on using the changeset viewer.