close Warning: Can't use blame annotator:
No changeset 10577 in the repository

source: source/services/dht/Dht.cpp@ 10673

Last change on this file since 10673 was 10653, checked in by Michael Tänzer, 12 years ago

Merge the ASIO branch back into trunk

File size: 12.3 KB
RevLine 
1/*
2 * Dht.cpp
3 *
4 * Created on: 20.06.2012
5 * Author: mario
6 */
7
8#include "Dht.h"
9#include <boost/date_time/time_clock.hpp>
10
11namespace ariba_service {
12namespace dht {
13
14use_logging_cpp(Dht)
15
16using namespace std;
17using boost::date_time::second_clock;
18using boost::posix_time::ptime;
19
20SystemEventType DhtRepublishEvent("DhtRepublishEvent");
21SystemEventType DhtCleanupEvent("DhtCleanupEvent");
22
23
24Dht::Dht(ariba::ServiceID serviceID, ariba::Node* node) :
25 serviceID(serviceID),
26 node(node),
27 cleanup_running(false),
28 listener(NULL)
29{
30 this->node->bind(this, serviceID);
31}
32
33Dht::~Dht()
34{
35 this->node->unbind(this, serviceID);
36}
37
38
39
40void Dht::put(const std::string& key, const std::string& value, uint16_t ttl)
41{
42 DhtMessage msg(DhtMessage::DhtPut, key, value, ttl);
43
44 handle_dht_message(msg, NodeID::UNSPECIFIED);
45}
46
47void Dht::get(const std::string& key)
48{
49 DhtMessage msg(DhtMessage::DhtGet, key);
50
51 handle_dht_message(msg, NodeID::UNSPECIFIED);
52}
53
54void Dht::atomic_put_and_get(const std::string& key, const std::string& value, uint16_t ttl)
55{
56 DhtMessage msg(DhtMessage::DhtPutAndGet, key, value, ttl);
57
58 handle_dht_message(msg, NodeID::UNSPECIFIED);
59}
60
61void Dht::meet(const std::string& key, const std::string& value, uint16_t ttl_in_sec)
62{
63 // insert into meet_store
64 insert_into_table(meet_store,
65 key,
66 std::vector<std::string>(1, value),
67 ttl_in_sec);
68
69 // send message (and program republishing)
70 send_meet_message(key, value);
71}
72
73void Dht::stop_meet(const std::string& key, const std::string& value)
74{
75 remove_from_table(meet_store,
76 key,
77 std::vector<std::string>(1, value));
78}
79
80void Dht::remove(const std::string& key, const std::string& value)
81{
82 // send delete message
83 DhtMessage msg(DhtMessage::DhtRemove, key, value);
84
85 handle_dht_message(msg, NodeID::UNSPECIFIED);
86}
87
88
89
90
91bool Dht::add_listener(DhtAnswerInterface* new_listener)
92{
93 if ( listener == NULL )
94 {
95 listener = new_listener;
96
97 return true;
98 }
99 else
100 {
101 return false;
102 }
103}
104
105bool Dht::remove_listener(DhtAnswerInterface* new_listener)
106{
107 if (listener == new_listener) {
108 listener = NULL;
109 return true;
110
111 } else {
112 return false;
113 }
114}
115
116
117
118
119//** PRIVATE FUNCTIONS **//
120
121void Dht::handle_dht_message(const DhtMessage& message, const NodeID& source)
122{
123 // send message closer to hashed key
124 NodeID addr = message.getHashedKey();
125
126 logging_debug("Processing DHT message...");
127
128 logging_debug("Dest Addr: " << addr.toString());
129
130 // * send closer, if possible *
131 const ariba::NodeID dest = node->sendMessageCloserToNodeID(message, addr, this->serviceID);
132
133 logging_debug("Closer Node: " << dest.toString());
134
135 // couldn't send closer, so we are the closest node
136 // ---> * handle dht request * (store value, etc.)
137 if ( dest == NodeID::UNSPECIFIED )
138 {
139 logging_debug("DHT: We are the closest node!");
140
141 switch (message.getType())
142 {
143 case DhtMessage::DhtPut:
144 {
145 insert_into_table(
146 table,
147 message.getKey(),
148 message.getValues(),
149 message.getTTL());
150
151 break;
152 }
153
154 case DhtMessage::DhtGet:
155 {
156 answer_dht_request(message.getKey(), source);
157
158 break;
159 }
160
161 case DhtMessage::DhtPutAndGet:
162 {
163 insert_into_table(
164 table,
165 message.getKey(),
166 message.getValues(),
167 message.getTTL());
168 answer_dht_request(message.getKey(), source);
169
170 break;
171 }
172
173 case DhtMessage::DhtRemove:
174 {
175 remove_from_table(table, message.getKey(), message.getValues());
176
177 break;
178 }
179 }
180 }
181}
182
183
184void Dht::insert_into_table(DhtTableType& table,
185 const std::string& key,
186 const vector<std::string>& values,
187 uint16_t ttl)
188{
189 DhtTableType::mapped_type& value_entries = table[key];
190
191 BOOST_FOREACH(const std::string& value, values) {
192
193 // Debug output
194 logging_info("DHT: Inserting (" << key << ", " << value << ")");
195
196 // push the value for the given key (into the vector)
197 bool entry_updated = false;
198 for (
199 DhtTableType::mapped_type::iterator position = value_entries.begin();
200 position != value_entries.end();
201 ++position)
202 {
203 if (position->get_value() == value) {
204 position->set_ttl(ttl);
205 entry_updated = true;
206 break;
207 }
208 }
209
210 if (!entry_updated) {
211 value_entries.push_back(ValueEntry(value, ttl));
212 }
213 }
214
215 schedule_cleanup_event();
216}
217
218
219void Dht::remove_from_table(DhtTableType& table,
220 const std::string& key,
221 const vector<std::string>& values)
222{
223 logging_debug("DHT: trying to delete some values for key " << key);
224 // find key
225 DhtTableType::iterator key_position = table.find(key);
226 if (key_position == table.end()) {
227 return;
228 }
229
230 // delete values from set of values
231 DhtTableType::mapped_type& entries = key_position->second;
232 BOOST_FOREACH(const std::string& value, values) {
233 for (
234 DhtTableType::mapped_type::iterator entry = entries.begin();
235 entry != entries.end();
236 ++entry)
237 {
238 if (entry->get_value() == value) {
239 logging_info("DHT: Deleting "
240 "(" <<key << ", " << entry->get_value() << ")");
241 entries.erase(entry);
242 break;
243 }
244 }
245 }
246
247 // the key could empty now
248 // ---> remove it
249 if ( entries.size() == 0 )
250 {
251 table.erase(key_position);
252 }
253}
254
255
256void Dht::cleanup_table(DhtTableType& table)
257{
258 logging_debug("DHT: cleaning up table");
259
260 vector<std::string> to_be_deleted;
261
262 for (
263 DhtTableType::iterator position = table.begin();
264 position != table.end();
265 ++position)
266 {
267 cleanup_entries(position->second);
268
269 // mark entry container for removal if empty
270 if (position->second.size() == 0) {
271 to_be_deleted.push_back(position->first);
272 }
273 }
274
275 BOOST_FOREACH(const std::string& key, to_be_deleted) {
276 table.erase(key);
277 }
278}
279
280void Dht::cleanup_entries(DhtTableType::mapped_type& entries)
281{
282 DhtTableType::mapped_type::iterator position = entries.begin();
283 while (position != entries.end()) {
284
285 if (position->is_ttl_elapsed()) {
286 // remove stale entry
287 position = entries.erase(position);
288
289 } else {
290 // move on otherwise
291 ++position;
292 }
293 }
294}
295
296
297void Dht::answer_dht_request(const std::string& key, const NodeID& source)
298{
299 // get entries from table
300 const DhtTableType::mapped_type& entries = table[key];
301
302 // need to convert value entries to strings
303 vector<std::string> values;
304 values.reserve(entries.size());
305 BOOST_FOREACH(const ValueEntry& entry, entries) {
306
307 if (!entry.is_ttl_elapsed()) {
308 values.push_back(entry.get_value());
309 }
310
311 }
312
313 // BRANCH: request comes from another node
314 // ---> send answer message
315 if ( source != NodeID::UNSPECIFIED )
316 {
317 // create answer message
318 DhtMessage msg(DhtMessage::DhtAnswer, key, values);
319
320 // * send answer *
321 node->sendMessage(msg, source, serviceID);
322 }
323
324 // BRANCH: local request
325 // ---> inform listeners directly (TODO code duplicates...)
326 else
327 {
328 logging_debug("DHT: Answering request for key '" << key << "' locally");
329
330 // * inform listeners *
331 if ( listener )
332 {
333 listener->handle_dht_answer(key, values);
334 }
335 }
336
337
338 // an empty key could have been created
339 // ---> remove it
340 if ( entries.size() == 0 )
341 {
342 table.erase(key);
343 }
344}
345
346
347void Dht::send_meet_message(const std::string& key, const std::string& value)
348{
349 // send put&get message
350 DhtMessage msg(DhtMessage::DhtPutAndGet, key, value, MEET_DHT_TTL);
351
352 handle_dht_message(msg, NodeID::UNSPECIFIED);
353
354 // program timer for republish (or deletion)
355 Key_Value* kv = new Key_Value;
356 kv->key = key;
357 kv->value = value;
358
359 SystemQueue::instance().scheduleEvent(
360 SystemEvent( this, DhtRepublishEvent, kv),
361 MEET_REPUBLISH_INTERVAL * 1000 );
362}
363
364
365void Dht::meet_update_event(const std::string& key, const std::string& value)
366{
367 // get entries from table
368 DhtTableType::mapped_type& entries = meet_store[key];
369
370 cleanup_entries(entries);
371
372 // find right entry
373 BOOST_FOREACH(const ValueEntry& entry, entries) {
374 if (entry.get_value() == value) {
375
376 // republish value
377 logging_debug("DHT: Republishing "
378 "(" << key << ", " << entry.get_value() << ")");
379 send_meet_message(key, entry.get_value());
380 }
381 }
382
383 // an empty key could have been created
384 // ---> remove it
385 if ( entries.size() == 0 )
386 {
387 meet_store.erase(key);
388 }
389}
390
391void Dht::schedule_cleanup_event(bool reschedule)
392{
393 if (reschedule || !cleanup_running) {
394 SystemQueue::instance().scheduleEvent(
395 SystemEvent(this, DhtCleanupEvent),
396 CLEANUP_INTERVAL * 1000);
397 cleanup_running = true;
398 }
399}
400
401
402void Dht::print_dht()
403{
404 logging_debug("======== DHT ========");
405 for ( DhtTableType::iterator dht_it = table.begin(); dht_it != table.end(); dht_it++)
406 {
407 logging_debug("Key: " << dht_it->first);
408
409 for ( DhtTableType::mapped_type::iterator value_it = dht_it->second.begin();
410 value_it != dht_it->second.end();
411 value_it++ )
412 {
413 logging_debug("--> " << value_it->get_value());
414 }
415
416 logging_debug("- - - - -");
417 }
418
419 logging_debug("======== [DHT] ========");
420}
421
422
423
424void Dht::onMessage(const ariba::DataMessage& msg, const ariba::NodeID& source,
425 const ariba::LinkID& lnk)
426{
427 logging_debug("DHT: Incoming message...");
428
429 DhtMessage* mess = msg.getMessage()->convert<DhtMessage> ();
430
431 // handle message
432 switch (mess->getType())
433 {
434 // BRANCH: Message is an Answer for our request
435 case DhtMessage::DhtAnswer:
436 {
437 logging_debug("DHT: Got answer for key '" << mess->getKey() << "'");
438
439 BOOST_FOREACH(string str, mess->getValues())
440 {
441 logging_debug("--> Value: '" << str << "'");
442 }
443
444 // * inform listeners *
445 if ( listener )
446 {
447 listener->handle_dht_answer(mess->getKey(), mess->getValues());
448 }
449
450 break;
451 }
452
453 // BRANCH: Message is a Request
454 // ---> route or handle
455 default:
456 {
457 handle_dht_message(*mess, source);
458
459 break;
460 }
461 }
462
463 delete mess;
464}
465
466
467void Dht::handleSystemEvent( const SystemEvent& event )
468{
469
470 if (event.getType() == DhtRepublishEvent) {
471 logging_debug("DHT: Meet republish event!");
472
473 // republish meet entry
474 Key_Value* kv = event.getData<Key_Value>();
475 meet_update_event(kv->key, kv->value);
476 delete kv;
477
478 } else if (event.getType() == DhtCleanupEvent) {
479 logging_debug("DHT: Cleanup event!");
480
481 cleanup_table(table);
482 schedule_cleanup_event(true);
483 }
484}
485
486
487/**************
488 * ValueEntry *
489 **************/
490
491Dht::ValueEntry::ValueEntry(
492 const std::string& value,
493 uint16_t ttl) :
494 ttl(ttl),
495 last_update(second_clock<ptime>::universal_time()),
496 value(value)
497{
498}
499
500
501void Dht::ValueEntry::refresh() {
502 last_update = second_clock<ptime>::universal_time();
503}
504
505
506const std::string& Dht::ValueEntry::get_value() const {
507 return value;
508}
509
510uint16_t Dht::ValueEntry::get_age() const
511{
512 boost::posix_time::time_duration diff =
513 second_clock<ptime>::universal_time() - last_update;
514
515 return diff.total_seconds();
516}
517
518uint16_t Dht::ValueEntry::get_ttl() const {
519 return ttl;
520}
521
522void Dht::ValueEntry::set_ttl(uint16_t ttl) {
523 this->refresh();
524 this->ttl = ttl;
525}
526
527bool Dht::ValueEntry::is_ttl_elapsed() const {
528 // ttl == 0 signals infinite lifetime
529 if (ttl == 0) {
530 return false;
531 }
532
533 return second_clock<ptime>::universal_time() >=
534 (last_update + boost::posix_time::seconds(ttl));
535}
536
537uint16_t Dht::ValueEntry::get_remaining_ttl() const
538{
539 if ( ttl == 0 )
540 return -1;
541
542 if ( is_ttl_elapsed() )
543 return 0;
544
545 boost::posix_time::time_duration diff =
546 (last_update + boost::posix_time::seconds(ttl)) -
547 second_clock<ptime>::universal_time();
548
549 return ttl - get_age();
550}
551
552}} /* namespace ariba_service::dht */
Note: See TracBrowser for help on using the repository browser.