Changeset 10653 for source/ariba/overlay
- Timestamp:
- Jul 25, 2012, 11:41:36 AM (12 years ago)
- Location:
- source/ariba/overlay
- Files:
-
- 2 deleted
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/overlay/BaseOverlay.cpp
r10576 r10653 51 51 52 52 #include "ariba/overlay/messages/OverlayMsg.h" 53 #include "ariba/overlay/messages/DHTMessage.h"54 53 #include "ariba/overlay/messages/JoinRequest.h" 55 54 #include "ariba/overlay/messages/JoinReply.h" … … 66 65 #define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION 67 66 68 class ValueEntry {69 public:70 ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),71 last_change(time(NULL)), value(value.clone()) {72 }73 74 ValueEntry( const ValueEntry& value ) :75 ttl(value.ttl), last_update(value.last_update),76 last_change(value.last_change), value(value.value.clone()) {77 }78 79 ~ValueEntry() {80 value.release();81 }82 83 void refresh() {84 last_update = time(NULL);85 }86 87 void set_value( const Data& value ) {88 this->value.release();89 this->value = value.clone();90 this->last_change = time(NULL);91 this->last_update = time(NULL);92 }93 94 Data get_value() const {95 return value;96 }97 98 uint16_t get_ttl() const {99 return ttl;100 }101 102 void set_ttl( uint16_t ttl ) {103 this->ttl = ttl;104 }105 106 bool is_ttl_elapsed() const {107 // is persistent? yes-> always return false108 if (ttl==0) return false;109 // return true, if ttl is elapsed110 return ( difftime( time(NULL), this->last_update ) > ttl );111 }112 113 private:114 uint16_t ttl;115 time_t last_update;116 time_t last_change;117 Data value;118 };119 120 class DHTEntry {121 public:122 Data key;123 vector<ValueEntry> values;124 125 vector<Data> get_values() {126 vector<Data> vect;127 BOOST_FOREACH( ValueEntry& e, values )128 vect.push_back( e.get_value() );129 return vect;130 }131 132 void erase_expired_entries() {133 for (vector<ValueEntry>::iterator i = values.begin();134 i != values.end(); i++ )135 if (i->is_ttl_elapsed())136 i = values.erase(i)-1;137 }138 };139 140 class DHT {141 public:142 typedef vector<DHTEntry> Entries;143 typedef vector<ValueEntry> Values;144 Entries entries;145 static const bool verbose = false;146 147 static bool equals( const Data& lhs, const Data& rhs ) {148 if (rhs.getLength()!=lhs.getLength()) return false;149 for (size_t i=0; i<lhs.getLength()/8; i++)150 if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;151 return true;152 }153 154 void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {155 cleanup();156 157 // find entry158 for (size_t i=0; i<entries.size(); i++) {159 DHTEntry& entry = entries.at(i);160 161 // check if key is already known162 if ( equals(entry.key, key) ) {163 164 // check if value is already in values list165 for (size_t j=0; j<entry.values.size(); j++) {166 // found value already? yes-> refresh ttl167 if ( equals(entry.values[j].get_value(), value) ) {168 entry.values[j].refresh();169 if (verbose)170 std::cout << "DHT: Republished value. Refreshing value timestamp."171 << std::endl;172 return;173 }174 }175 176 // new value-> add to entry177 if (verbose)178 std::cout << "DHT: Added value to "179 << " key=" << key << " with value=" << value << std::endl;180 entry.values.push_back( ValueEntry( value ) );181 entry.values.back().set_ttl(ttl);182 return;183 }184 }185 186 // key is unknown-> add key value pair187 if (verbose)188 std::cout << "DHT: New key value pair "189 << " key=" << key << " with value=" << value << std::endl;190 191 // add new entry192 entries.push_back( DHTEntry() );193 DHTEntry& entry = entries.back();194 entry.key = key.clone();195 entry.values.push_back( ValueEntry(value) );196 entry.values.back().set_ttl(ttl);197 }198 199 vector<Data> get( const Data& key ) {200 cleanup();201 // find entry202 for (size_t i=0; i<entries.size(); i++) {203 DHTEntry& entry = entries.at(i);204 if ( equals(entry.key,key) )205 return entry.get_values();206 }207 return vector<Data>();208 }209 210 bool remove( const Data& key ) {211 cleanup();212 213 // find entry214 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {215 DHTEntry& entry = *i;216 217 // found? yes-> delete entry218 if ( equals(entry.key, key) ) {219 entries.erase(i);220 return true;221 }222 }223 return false;224 }225 226 bool remove( const Data& key, const Data& value ) {227 cleanup();228 // find entry229 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {230 DHTEntry& entry = *i;231 232 // found? yes-> try to find value233 if ( equals(entry.key, key) ) {234 for (Values::iterator j = entry.values.begin();235 j != entry.values.end(); j++) {236 237 // value found? yes-> delete238 if (equals(j->get_value(), value)) {239 j = entry.values.erase(j)-1;240 return true;241 }242 }243 }244 }245 return false;246 }247 248 void cleanup() {249 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {250 DHTEntry& entry = *i;251 entry.erase_expired_entries();252 if (entry.values.size()==0)253 i = entries.erase(i)-1;254 }255 }256 };257 67 258 68 // ---------------------------------------------------------------------------- … … 758 568 sideport(&SideportListener::DEFAULT), overlayInterface(NULL), 759 569 counter(0) { 760 initDHT();761 570 } 762 571 763 572 BaseOverlay::~BaseOverlay() { 764 destroyDHT();765 573 } 766 574 … … 1078 886 } 1079 887 888 1080 889 seqnum_t BaseOverlay::sendMessage(const Message* message, 1081 890 const NodeID& node, const ServiceID& service) { … … 1114 923 } 1115 924 925 926 NodeID BaseOverlay::sendMessageCloserToNodeID(const Message* message, 927 const NodeID& address, const ServiceID& service) { 928 929 if ( overlayInterface->isClosestNodeTo(address) ) 930 { 931 return NodeID::UNSPECIFIED; 932 } 933 934 const NodeID& closest_node = overlayInterface->getNextNodeId(address); 935 936 if ( closest_node != NodeID::UNSPECIFIED ) 937 { 938 seqnum_t seqnum = sendMessage(message, closest_node, service); 939 } 940 941 return closest_node; // XXX return seqnum ?? tuple? closest_node via (non const) reference? 942 } 1116 943 // ---------------------------------------------------------------------------- 1117 944 … … 1831 1658 overlayMsg->addRouteRecord(nodeId); 1832 1659 1833 // handle dht messages (do not route)1834 if (overlayMsg->isDHTMessage()) {1835 bool ret = handleDHTMessage(overlayMsg);1836 delete overlayMsg;1837 return ret;1838 }1839 1840 1660 // handle signaling messages (do not route!) 1841 1661 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart && … … 1856 1676 delete overlayMsg; 1857 1677 return true; 1858 }1859 1860 // handle DHT response messages1861 if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {1862 bool ret = handleDHTMessage(overlayMsg);1863 delete overlayMsg;1864 return ret;1865 1678 } 1866 1679 … … 1964 1777 stabilizeRelays(); 1965 1778 stabilizeLinks(); 1966 stabilizeDHT();1967 1779 updateVisual(); 1968 1780 } … … 2110 1922 // ---------------------------------------------------------------------------- 2111 1923 2112 void BaseOverlay::initDHT() {2113 dht = new DHT();2114 localDHT = new DHT();2115 republishCounter = 0;2116 }2117 2118 void BaseOverlay::destroyDHT() {2119 delete dht;2120 delete localDHT;2121 }2122 2123 /// stabilize DHT state2124 void BaseOverlay::stabilizeDHT() {2125 2126 // do refresh every 2 seconds2127 if (republishCounter < 2) {2128 republishCounter++;2129 return;2130 }2131 republishCounter = 0;2132 2133 // remove old values from DHT2134 BOOST_FOREACH( DHTEntry& entry, dht->entries ) {2135 // erase old entries2136 entry.erase_expired_entries();2137 }2138 2139 // re-publish values-> do not refresh locally stored values2140 BOOST_FOREACH( DHTEntry& entry, localDHT->entries ) {2141 BOOST_FOREACH( ValueEntry& value, entry.values )2142 dhtPut(entry.key, value.get_value(), value.get_ttl(), false, true );2143 }2144 }2145 2146 // handle DHT messages2147 bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {2148 2149 // de-capsulate message2150 logging_debug("Received DHT message");2151 DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();2152 2153 // handle DHT data message2154 if (msg->getType()==OverlayMsg::typeDHTData) {2155 const ServiceID& service = msg->getService();2156 logging_info( "Received DHT data for service " << service.toString() );2157 2158 // delegate data message2159 CommunicationListener* lst = getListener(service);2160 if(lst != NULL) lst->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );2161 delete dhtMsg;2162 return true;2163 }2164 2165 // route message to closest node2166 if (!overlayInterface->isClosestNodeTo(dhtMsg->getHashedKey())) {2167 logging_debug("Routing DHT message to closest node "2168 << " from " << msg->getSourceNode()2169 << " to " << dhtMsg->getHashedKey()2170 );2171 dhtSend(msg, dhtMsg->getHashedKey());2172 delete dhtMsg;2173 return true;2174 }2175 2176 // now, we are the closest node...2177 switch (msg->getType()) {2178 2179 // ----------------------------------------------------------------- put ---2180 case OverlayMsg::typeDHTPut: {2181 logging_debug("DHT-Put: Attempt to store values for key "2182 << dhtMsg->getKey());2183 if (dhtMsg->doReplace()) {2184 logging_debug("DHT-Put: Attempt to replace key: remove old values first!");2185 dht->remove(dhtMsg->getKey());2186 }2187 BOOST_FOREACH( Data value, dhtMsg->getValues() ) {2188 logging_debug("DHT-Put: Stored value: " << value );2189 dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );2190 }2191 break;2192 }2193 2194 // ----------------------------------------------------------------- get ---2195 case OverlayMsg::typeDHTGet: {2196 logging_info("DHT-Get: key=" << dhtMsg->getKey() );2197 vector<Data> vect = dht->get(dhtMsg->getKey());2198 BOOST_FOREACH(const Data& d, vect)2199 logging_info("DHT-Get: value=" << d);2200 OverlayMsg omsg(*msg);2201 omsg.swapRoles();2202 omsg.setType(OverlayMsg::typeDHTData);2203 DHTMessage dhtmsg(dhtMsg->getKey(), vect);2204 omsg.encapsulate(&dhtmsg);2205 2206 logging_info("Sending DHT response to " << omsg.getDestinationNode());2207 sendMessage(&omsg, omsg.getDestinationNode());2208 break;2209 }2210 2211 // -------------------------------------------------------------- remove ---2212 case OverlayMsg::typeDHTRemove: {2213 if (dhtMsg->hasValues()) {2214 BOOST_FOREACH( Data value, dhtMsg->getValues() )2215 dht->remove(dhtMsg->getKey(), value );2216 } else2217 dht->remove( dhtMsg->getKey() );2218 break;2219 }2220 2221 // -------------------------------------------------------------- default---2222 default:2223 logging_error("DHT Message type unknown.");2224 return false;2225 }2226 delete dhtMsg;2227 return true;2228 }2229 2230 /// put a value to the DHT with a ttl given in seconds2231 void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl, bool replace, bool no_local_refresh ) {2232 2233 // log2234 logging_info("DHT-Put:"2235 << " key=" << key << " value=" << value2236 << " ttl=" << ttl << " replace=" << replace2237 );2238 2239 if (!no_local_refresh) {2240 2241 // put into local data store (for refreshes)2242 if (replace) localDHT->remove(key);2243 localDHT->put(key, value, ttl);2244 }2245 2246 DHTMessage dhtmsg( key, value );2247 dhtmsg.setReplace( replace );2248 dhtmsg.setTTL(ttl);2249 2250 OverlayMsg msg( OverlayMsg::typeDHTPut );2251 msg.encapsulate( &dhtmsg );2252 msg.setSourceNode(this->nodeId);2253 dhtSend(&msg, dhtmsg.getHashedKey());2254 }2255 2256 /// removes a key value pair from the DHT2257 void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {2258 // remove from local data store2259 localDHT->remove(key,value);2260 2261 DHTMessage dhtmsg(key,value);2262 2263 // send message2264 OverlayMsg msg(OverlayMsg::typeDHTRemove);2265 msg.encapsulate( &dhtmsg );2266 msg.setSourceNode(this->nodeId);2267 dhtSend(&msg, dhtmsg.getHashedKey());2268 }2269 2270 /// removes all data stored at the given key2271 void BaseOverlay::dhtRemove( const Data& key ) {2272 // log: remove key2273 logging_info("DHT-Remove: Removing key=" << key );2274 2275 DHTMessage dhtmsg(key);2276 2277 // send message2278 OverlayMsg msg(OverlayMsg::typeDHTRemove);2279 msg.encapsulate( &dhtmsg );2280 msg.setSourceNode(this->nodeId);2281 dhtSend(&msg, dhtmsg.getHashedKey());2282 }2283 2284 /// requests data stored using key2285 void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {2286 // log: get2287 logging_info("DHT-Get: Trying to resolve key=" <<2288 key << " for service=" << service.toString() );2289 2290 DHTMessage dhtmsg(key);2291 2292 // send message2293 OverlayMsg msg(OverlayMsg::typeDHTGet);2294 msg.setService(service);2295 msg.encapsulate( &dhtmsg );2296 msg.setSourceNode(this->nodeId);2297 dhtSend(&msg, dhtmsg.getHashedKey());2298 }2299 2300 void BaseOverlay::dhtSend( OverlayMsg* msg, const NodeID& dest ) {2301 // log: dht send2302 logging_info("DHT-Send: Sending message with key=" << dest.toString() );2303 2304 // local storage? yes-> put into DHT directly2305 if (overlayInterface->isClosestNodeTo(dest)) {2306 // be compatible with old code so set destination to hashed key2307 msg->setDestinationNode(dest);2308 2309 Data d = data_serialize(msg);2310 Message m2(d);2311 OverlayMsg* m3 = m2.decapsulate<OverlayMsg>();2312 2313 handleDHTMessage(m3);2314 2315 delete m3;2316 return;2317 } else {2318 // need to route2319 NodeID next_hop = overlayInterface->getNextNodeId(dest);2320 msg->setDestinationNode(next_hop);2321 logging_info("DHT-Send: sending via node " << next_hop.toString());2322 2323 send(msg, next_hop);2324 2325 return;2326 }2327 }2328 2329 1924 std::string BaseOverlay::debugInformation() { 2330 1925 std::stringstream s; -
source/ariba/overlay/BaseOverlay.h
r7532 r10653 74 74 using std::vector; 75 75 using std::list; 76 using std::cout;77 76 using std::map; 78 77 using std::make_pair; … … 187 186 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 188 187 188 /** 189 * send a message to the closest directly known node to an address 190 * 191 * @return NodeID of the (closest) destination node; 192 */ 193 NodeID sendMessageCloserToNodeID(const Message* message, const NodeID& address, 194 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 195 189 196 /** 190 197 * Send out a message to all nodes that are known in the overlay structure. … … 287 294 */ 288 295 void leaveSpoVNet(); 289 290 /// put a value to the DHT with a ttl given in seconds291 void dhtPut( const Data& key, const Data& value, int ttl = 0, bool replace = false, bool no_local_refresh = false);292 293 /// removes a key value pair from the DHT294 void dhtRemove( const Data& key, const Data& value );295 296 /// removes all data stored at the given key297 void dhtRemove( const Data& key );298 299 /// requests data stored using key300 void dhtGet( const Data& key, const ServiceID& service );301 296 302 297 protected: … … 411 406 bool handleJoinRequest( OverlayMsg* msg, const LinkID& bcLink ); 412 407 bool handleJoinReply( OverlayMsg* msg, const LinkID& bcLink ); 413 414 // handle DHT messages415 bool handleDHTMessage( OverlayMsg* msg );416 408 417 409 // handle link messages … … 506 498 bool ignore_down = false ); 507 499 508 // distributed hashtable handling ------------------------------------------509 510 DHT* dht;511 DHT* localDHT;512 int republishCounter;513 514 void initDHT();515 void destroyDHT();516 void stabilizeDHT();517 void dhtSend( OverlayMsg* msg, const NodeID& dest );518 519 500 // misc -------------------------------------------------------------------- 520 501 -
source/ariba/overlay/messages/OverlayMsg.h
r6919 r10653 89 89 typeLinkAlive = 0x35, ///< keep-alive message 90 90 91 // DHT routed messages 91 /// DHT routed messages 92 /// @deprecated because the DHT has been moved into a separate service 92 93 maskDHT = 0x40, ///< bit mask for dht messages 93 94 typeDHTPut = 0x41, ///< DHT put operation … … 96 97 97 98 /// DHT response messages 99 /// @deprecated because the DHT has been moved into a separate service 98 100 maskDHTResponse = 0x50, ///< bit mask for dht responses 99 101 typeDHTData = 0x51, ///< DHT get data … … 197 199 } 198 200 199 bool isDHTMessage() const {200 return hasTypeMask(maskDHT);201 }202 203 201 /// number of hops and time to live ---------------------------------------- 204 202
Note:
See TracChangeset
for help on using the changeset viewer.