source: source/services/ariba_dht/Dht.cpp@ 12060

Last change on this file since 12060 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 12.4 KB
Line 
1/*
2 * Dht.cpp
3 *
4 * Created on: 20.06.2012
5 * Author: mario
6 */
7
8#include "Dht.h"
9#include "messages/DhtMessage.h"
10#include <boost/date_time/time_clock.hpp>
11
12namespace ariba_service {
13namespace dht {
14
15use_logging_cpp(Dht)
16
17using namespace std;
18using boost::date_time::second_clock;
19using boost::posix_time::ptime;
20
21SystemEventType DhtRepublishEvent("DhtRepublishEvent");
22SystemEventType DhtCleanupEvent("DhtCleanupEvent");
23
24
25Dht::Dht(ariba::ServiceID serviceID, ariba::Node* node) :
26 serviceID(serviceID),
27 node(node),
28 cleanup_running(false),
29 listener(NULL)
30{
31 this->node->bind(this, serviceID);
32}
33
34Dht::~Dht()
35{
36 this->node->unbind(this, serviceID);
37}
38
39
40
41void Dht::put(const std::string& key, const std::string& value, uint16_t ttl)
42{
43 DhtMessage msg(DhtMessage::DhtPut, key, value, ttl);
44
45 handle_dht_message(msg, NodeID::UNSPECIFIED);
46}
47
48void Dht::get(const std::string& key)
49{
50 DhtMessage msg(DhtMessage::DhtGet, key, node->getNodeId());
51
52 handle_dht_message(msg, NodeID::UNSPECIFIED);
53}
54
55void Dht::atomic_put_and_get(const std::string& key, const std::string& value, uint16_t ttl)
56{
57 DhtMessage msg(DhtMessage::DhtPutAndGet, key, value, ttl);
58
59 handle_dht_message(msg, NodeID::UNSPECIFIED);
60}
61
62void Dht::meet(const std::string& key, const std::string& value, uint16_t ttl_in_sec)
63{
64 // insert into meet_store
65 insert_into_table(meet_store,
66 key,
67 std::vector<std::string>(1, value),
68 ttl_in_sec);
69
70 // send message (and program republishing)
71 send_meet_message(key, value);
72}
73
74void Dht::stop_meet(const std::string& key, const std::string& value)
75{
76 remove_from_table(meet_store,
77 key,
78 std::vector<std::string>(1, value));
79}
80
81void Dht::remove(const std::string& key, const std::string& value)
82{
83 // send delete message
84 DhtMessage msg(DhtMessage::DhtRemove, key, value);
85
86 handle_dht_message(msg, NodeID::UNSPECIFIED);
87}
88
89
90
91
92bool Dht::add_listener(DhtAnswerInterface* new_listener)
93{
94 if ( listener == NULL )
95 {
96 listener = new_listener;
97
98 return true;
99 }
100 else
101 {
102 return false;
103 }
104}
105
106bool Dht::remove_listener(DhtAnswerInterface* new_listener)
107{
108 if (listener == new_listener) {
109 listener = NULL;
110 return true;
111
112 } else {
113 return false;
114 }
115}
116
117
118
119
120//** PRIVATE FUNCTIONS **//
121
122void Dht::handle_dht_message(const DhtMessage& message, const NodeID& source)
123{
124 // send message closer to hashed key
125 NodeID addr = message.getHashedKey();
126
127 logging_debug("Processing DHT message...");
128
129 logging_debug("Dest Addr: " << addr.toString());
130
131 // * send closer, if possible *
132 const ariba::NodeID dest = node->sendMessageCloserToNodeID(message, addr, this->serviceID);
133
134 logging_debug("Closer Node: " << dest.toString());
135
136 // couldn't send closer, so we are the closest node
137 // ---> * handle dht request * (store value, etc.)
138 if ( dest == NodeID::UNSPECIFIED )
139 {
140 logging_debug("DHT: We are the closest node!");
141
142 switch (message.getType())
143 {
144 case DhtMessage::DhtPut:
145 {
146 insert_into_table(
147 table,
148 message.getKey(),
149 message.getValues(),
150 message.getTTL());
151
152 break;
153 }
154
155 case DhtMessage::DhtGet:
156 {
157 answer_dht_request(message.getKey(), message.getSourceNode());
158
159 break;
160 }
161
162 case DhtMessage::DhtPutAndGet:
163 {
164 insert_into_table(
165 table,
166 message.getKey(),
167 message.getValues(),
168 message.getTTL());
169 answer_dht_request(message.getKey(), message.getSourceNode());
170
171 break;
172 }
173
174 case DhtMessage::DhtRemove:
175 {
176 remove_from_table(table, message.getKey(), message.getValues());
177
178 break;
179 }
180 }
181 }
182}
183
184
185void Dht::insert_into_table(DhtTableType& table,
186 const std::string& key,
187 const vector<std::string>& values,
188 uint16_t ttl)
189{
190 DhtTableType::mapped_type& value_entries = table[key];
191
192 BOOST_FOREACH(const std::string& value, values) {
193
194 // Debug output
195 logging_info("DHT: Inserting (" << key << ", " << value << ")");
196
197 // push the value for the given key (into the vector)
198 bool entry_updated = false;
199 for (
200 DhtTableType::mapped_type::iterator position = value_entries.begin();
201 position != value_entries.end();
202 ++position)
203 {
204 if (position->get_value() == value) {
205 position->set_ttl(ttl);
206 entry_updated = true;
207 break;
208 }
209 }
210
211 if (!entry_updated) {
212 value_entries.push_back(ValueEntry(value, ttl));
213 }
214 }
215
216 schedule_cleanup_event();
217}
218
219
220void Dht::remove_from_table(DhtTableType& table,
221 const std::string& key,
222 const vector<std::string>& values)
223{
224 logging_debug("DHT: trying to delete some values for key " << key);
225 // find key
226 DhtTableType::iterator key_position = table.find(key);
227 if (key_position == table.end()) {
228 return;
229 }
230
231 // delete values from set of values
232 DhtTableType::mapped_type& entries = key_position->second;
233 BOOST_FOREACH(const std::string& value, values) {
234 for (
235 DhtTableType::mapped_type::iterator entry = entries.begin();
236 entry != entries.end();
237 ++entry)
238 {
239 if (entry->get_value() == value) {
240 logging_info("DHT: Deleting "
241 "(" <<key << ", " << entry->get_value() << ")");
242 entries.erase(entry);
243 break;
244 }
245 }
246 }
247
248 // the key could empty now
249 // ---> remove it
250 if ( entries.size() == 0 )
251 {
252 table.erase(key_position);
253 }
254}
255
256
257void Dht::cleanup_table(DhtTableType& table)
258{
259 logging_debug("DHT: cleaning up table");
260
261 vector<std::string> to_be_deleted;
262
263 for (
264 DhtTableType::iterator position = table.begin();
265 position != table.end();
266 ++position)
267 {
268 cleanup_entries(position->second);
269
270 // mark entry container for removal if empty
271 if (position->second.size() == 0) {
272 to_be_deleted.push_back(position->first);
273 }
274 }
275
276 BOOST_FOREACH(const std::string& key, to_be_deleted) {
277 table.erase(key);
278 }
279}
280
281void Dht::cleanup_entries(DhtTableType::mapped_type& entries)
282{
283 DhtTableType::mapped_type::iterator position = entries.begin();
284 while (position != entries.end()) {
285
286 if (position->is_ttl_elapsed()) {
287 // remove stale entry
288 position = entries.erase(position);
289
290 } else {
291 // move on otherwise
292 ++position;
293 }
294 }
295}
296
297
298void Dht::answer_dht_request(const std::string& key, const NodeID& source)
299{
300 // get entries from table
301 const DhtTableType::mapped_type& entries = table[key];
302
303 // need to convert value entries to strings
304 vector<std::string> values;
305 values.reserve(entries.size());
306 BOOST_FOREACH(const ValueEntry& entry, entries) {
307
308 if (!entry.is_ttl_elapsed()) {
309 values.push_back(entry.get_value());
310 }
311
312 }
313
314 // BRANCH: request comes from another node
315 // ---> send answer message
316 if ( source != NodeID::UNSPECIFIED )
317 {
318 // create answer message
319 DhtMessage msg(DhtMessage::DhtAnswer, key, values);
320
321 // * send answer *
322 node->sendMessage(msg, source, serviceID);
323 }
324
325 // BRANCH: local request
326 // ---> inform listeners directly (TODO code duplicates...)
327 else
328 {
329 logging_debug("DHT: Answering request for key '" << key << "' locally");
330
331 // * inform listeners *
332 if ( listener )
333 {
334 listener->handle_dht_answer(key, values);
335 }
336 }
337
338
339 // an empty key could have been created
340 // ---> remove it
341 if ( entries.size() == 0 )
342 {
343 table.erase(key);
344 }
345}
346
347
348void Dht::send_meet_message(const std::string& key, const std::string& value)
349{
350 // send put&get message
351 DhtMessage msg(DhtMessage::DhtPutAndGet, key, value, MEET_DHT_TTL);
352
353 handle_dht_message(msg, NodeID::UNSPECIFIED);
354
355 // program timer for republish (or deletion)
356 Key_Value* kv = new Key_Value;
357 kv->key = key;
358 kv->value = value;
359
360 SystemQueue::instance().scheduleEvent(
361 SystemEvent( this, DhtRepublishEvent, kv),
362 MEET_REPUBLISH_INTERVAL * 1000 );
363}
364
365
366void Dht::meet_update_event(const std::string& key, const std::string& value)
367{
368 // get entries from table
369 DhtTableType::mapped_type& entries = meet_store[key];
370
371 cleanup_entries(entries);
372
373 // find right entry
374 BOOST_FOREACH(const ValueEntry& entry, entries) {
375 if (entry.get_value() == value) {
376
377 // republish value
378 logging_debug("DHT: Republishing "
379 "(" << key << ", " << entry.get_value() << ")");
380 send_meet_message(key, entry.get_value());
381 }
382 }
383
384 // an empty key could have been created
385 // ---> remove it
386 if ( entries.size() == 0 )
387 {
388 meet_store.erase(key);
389 }
390}
391
392void Dht::schedule_cleanup_event(bool reschedule)
393{
394 if (reschedule || !cleanup_running) {
395 SystemQueue::instance().scheduleEvent(
396 SystemEvent(this, DhtCleanupEvent),
397 CLEANUP_INTERVAL * 1000);
398 cleanup_running = true;
399 }
400}
401
402
403void Dht::print_dht()
404{
405 logging_debug("======== DHT ========");
406 for ( DhtTableType::iterator dht_it = table.begin(); dht_it != table.end(); dht_it++)
407 {
408 logging_debug("Key: " << dht_it->first);
409
410 for ( DhtTableType::mapped_type::iterator value_it = dht_it->second.begin();
411 value_it != dht_it->second.end();
412 value_it++ )
413 {
414 logging_debug("--> " << value_it->get_value());
415 }
416
417 logging_debug("- - - - -");
418 }
419
420 logging_debug("======== [DHT] ========");
421}
422
423
424
425void Dht::onMessage(const ariba::DataMessage& msg, const ariba::NodeID& source,
426 const ariba::LinkID& lnk)
427{
428 logging_debug("DHT: Incoming message...");
429
430 DhtMessage* mess = msg.getMessage()->convert<DhtMessage> ();
431
432 // handle message
433 switch (mess->getType())
434 {
435 // BRANCH: Message is an Answer for our request
436 case DhtMessage::DhtAnswer:
437 {
438 logging_debug("DHT: Got answer for key '" << mess->getKey() << "'");
439
440 BOOST_FOREACH(string str, mess->getValues())
441 {
442 logging_debug("--> Value: '" << str << "'");
443 }
444
445 // * inform listeners *
446 if ( listener )
447 {
448 listener->handle_dht_answer(mess->getKey(), mess->getValues());
449 }
450
451 break;
452 }
453
454 // BRANCH: Message is a Request
455 // ---> route or handle
456 default:
457 {
458 handle_dht_message(*mess, source);
459
460 break;
461 }
462 }
463
464 delete mess;
465}
466
467
468void Dht::handleSystemEvent( const SystemEvent& event )
469{
470
471 if (event.getType() == DhtRepublishEvent) {
472 logging_debug("DHT: Meet republish event!");
473
474 // republish meet entry
475 Key_Value* kv = event.getData<Key_Value>();
476 meet_update_event(kv->key, kv->value);
477 delete kv;
478
479 } else if (event.getType() == DhtCleanupEvent) {
480 logging_debug("DHT: Cleanup event!");
481
482 cleanup_table(table);
483 schedule_cleanup_event(true);
484 }
485}
486
487
488/**************
489 * ValueEntry *
490 **************/
491
492Dht::ValueEntry::ValueEntry(
493 const std::string& value,
494 uint16_t ttl) :
495 ttl(ttl),
496 last_update(second_clock<ptime>::universal_time()),
497 value(value)
498{
499}
500
501
502void Dht::ValueEntry::refresh() {
503 last_update = second_clock<ptime>::universal_time();
504}
505
506
507const std::string& Dht::ValueEntry::get_value() const {
508 return value;
509}
510
511uint16_t Dht::ValueEntry::get_age() const
512{
513 boost::posix_time::time_duration diff =
514 second_clock<ptime>::universal_time() - last_update;
515
516 return diff.total_seconds();
517}
518
519uint16_t Dht::ValueEntry::get_ttl() const {
520 return ttl;
521}
522
523void Dht::ValueEntry::set_ttl(uint16_t ttl) {
524 this->refresh();
525 this->ttl = ttl;
526}
527
528bool Dht::ValueEntry::is_ttl_elapsed() const {
529 // ttl == 0 signals infinite lifetime
530 if (ttl == 0) {
531 return false;
532 }
533
534 return second_clock<ptime>::universal_time() >=
535 (last_update + boost::posix_time::seconds(ttl));
536}
537
538uint16_t Dht::ValueEntry::get_remaining_ttl() const
539{
540 if ( ttl == 0 )
541 return -1;
542
543 if ( is_ttl_elapsed() )
544 return 0;
545
546 boost::posix_time::time_duration diff =
547 (last_update + boost::posix_time::seconds(ttl)) -
548 second_clock<ptime>::universal_time();
549
550 return ttl - get_age();
551}
552
553}} /* namespace ariba_service::dht */
Note: See TracBrowser for help on using the repository browser.