An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/services/ariba_dht/Dht.cpp @ 10700

Last change on this file since 10700 was 10700, checked in by Michael Tänzer, 7 years ago

Merge CMake branch into trunk

File size: 12.3 KB
Line 
1/*
2 * Dht.cpp
3 *
4 *  Created on: 20.06.2012
5 *      Author: mario
6 */
7
8#include "Dht.h"
9#include "messages/DhtMessage.h"
10#include <boost/date_time/time_clock.hpp>
11
12namespace ariba_service {
13namespace dht {
14
15use_logging_cpp(Dht)
16
17using namespace std;
18using boost::date_time::second_clock;
19using boost::posix_time::ptime;
20
21SystemEventType DhtRepublishEvent("DhtRepublishEvent");
22SystemEventType DhtCleanupEvent("DhtCleanupEvent");
23
24
25Dht::Dht(ariba::ServiceID serviceID, ariba::Node* node)  :
26        serviceID(serviceID),
27        node(node),
28        cleanup_running(false),
29        listener(NULL)
30{
31    this->node->bind(this, serviceID);
32}
33
34Dht::~Dht()
35{
36    this->node->unbind(this, serviceID);
37}
38
39
40
41void Dht::put(const std::string& key, const std::string& value, uint16_t ttl)
42{
43    DhtMessage msg(DhtMessage::DhtPut, key, value, ttl);
44
45    handle_dht_message(msg, NodeID::UNSPECIFIED);
46}
47
48void Dht::get(const std::string& key)
49{
50        DhtMessage msg(DhtMessage::DhtGet, key);
51       
52        handle_dht_message(msg, NodeID::UNSPECIFIED);
53}
54
55void Dht::atomic_put_and_get(const std::string& key, const std::string& value, uint16_t ttl)
56{   
57    DhtMessage msg(DhtMessage::DhtPutAndGet, key, value, ttl);
58
59    handle_dht_message(msg, NodeID::UNSPECIFIED);
60}
61
62void Dht::meet(const std::string& key, const std::string& value, uint16_t ttl_in_sec)
63{
64    // insert into meet_store
65    insert_into_table(meet_store,
66                key,
67                std::vector<std::string>(1, value),
68                ttl_in_sec);
69       
70    // send message (and program republishing)
71    send_meet_message(key, value);
72}
73
74void Dht::stop_meet(const std::string& key, const std::string& value)
75{
76    remove_from_table(meet_store,
77                key,
78                std::vector<std::string>(1, value));
79}
80
81void Dht::remove(const std::string& key, const std::string& value)
82{
83    // send delete message
84    DhtMessage msg(DhtMessage::DhtRemove, key, value);
85
86    handle_dht_message(msg, NodeID::UNSPECIFIED);
87}
88
89
90
91
92bool Dht::add_listener(DhtAnswerInterface* new_listener)
93{
94    if ( listener == NULL )
95    {
96        listener = new_listener;
97       
98        return true;
99    }
100    else
101    {
102        return false;
103    }
104}
105
106bool Dht::remove_listener(DhtAnswerInterface* new_listener)
107{
108        if (listener == new_listener) {
109                listener = NULL;
110                return true;
111               
112        } else {
113                return false;
114        }
115}
116
117
118
119
120//** PRIVATE FUNCTIONS **//
121
122void Dht::handle_dht_message(const DhtMessage& message, const NodeID& source)
123{
124    // send message closer to hashed key
125    NodeID addr = message.getHashedKey();
126
127    logging_debug("Processing DHT message...");
128   
129    logging_debug("Dest Addr: " << addr.toString());
130
131    // * send closer, if possible *
132    const ariba::NodeID dest = node->sendMessageCloserToNodeID(message, addr, this->serviceID);
133   
134    logging_debug("Closer Node: " << dest.toString());
135   
136    // couldn't send closer, so we are the closest node
137    //   ---> * handle dht request * (store value, etc.)
138    if ( dest == NodeID::UNSPECIFIED )
139    {
140        logging_debug("DHT: We are the closest node!");
141       
142        switch (message.getType())
143        {
144            case DhtMessage::DhtPut:
145            {
146                insert_into_table(
147                                table,
148                                message.getKey(),
149                                message.getValues(),
150                                message.getTTL());
151               
152                break;
153            }
154           
155            case DhtMessage::DhtGet:
156            {
157                answer_dht_request(message.getKey(), source);
158
159                break;
160            }
161           
162            case DhtMessage::DhtPutAndGet:
163            {
164                insert_into_table(
165                                table,
166                                message.getKey(),
167                                message.getValues(),
168                                message.getTTL());
169                answer_dht_request(message.getKey(), source);
170               
171                break;
172            }
173           
174            case DhtMessage::DhtRemove:
175            {
176                remove_from_table(table, message.getKey(), message.getValues());
177               
178                break;
179            }
180        }
181    }
182}
183
184
185void Dht::insert_into_table(DhtTableType& table,
186                const std::string& key,
187                const vector<std::string>& values,
188                uint16_t ttl)
189{
190        DhtTableType::mapped_type& value_entries = table[key];
191       
192        BOOST_FOREACH(const std::string& value, values) {
193               
194                // Debug output
195                logging_info("DHT: Inserting (" << key << ", " << value << ")");
196               
197                // push the value for the given key (into the vector)
198                bool entry_updated = false;
199                for (
200                                DhtTableType::mapped_type::iterator position = value_entries.begin();
201                                position != value_entries.end();
202                                ++position)
203                {
204                        if (position->get_value() == value) {
205                                position->set_ttl(ttl);
206                                entry_updated = true;
207                                break;
208                        }
209                }
210               
211                if (!entry_updated) {
212                        value_entries.push_back(ValueEntry(value, ttl));
213                }
214        }
215       
216        schedule_cleanup_event();
217}
218
219
220void Dht::remove_from_table(DhtTableType& table,
221                const std::string& key,
222                const vector<std::string>& values)
223{
224        logging_debug("DHT: trying to delete some values for key " << key);
225        // find key
226        DhtTableType::iterator key_position = table.find(key);
227        if (key_position == table.end()) {
228                return;
229        }
230       
231        // delete values from set of values
232        DhtTableType::mapped_type& entries = key_position->second;
233        BOOST_FOREACH(const std::string& value, values) {
234                for (
235                                DhtTableType::mapped_type::iterator entry = entries.begin();
236                                entry != entries.end();
237                                ++entry)
238                {
239                        if (entry->get_value() == value) {
240                                logging_info("DHT: Deleting "
241                                                "(" <<key << ", " << entry->get_value() << ")");
242                                entries.erase(entry);
243                                break;
244                        }
245                }
246        }
247       
248    // the key could empty now
249    //   ---> remove it
250    if ( entries.size() == 0 )
251    {
252        table.erase(key_position);
253    }
254}
255
256
257void Dht::cleanup_table(DhtTableType& table)
258{
259        logging_debug("DHT: cleaning up table");
260       
261        vector<std::string> to_be_deleted;
262       
263        for (
264                        DhtTableType::iterator position = table.begin();
265                        position != table.end();
266                        ++position)
267        {
268                cleanup_entries(position->second);
269               
270                // mark entry container for removal if empty
271                if (position->second.size() == 0) {
272                        to_be_deleted.push_back(position->first);
273                }
274        }
275       
276        BOOST_FOREACH(const std::string& key, to_be_deleted) {
277                table.erase(key);
278        }
279}
280
281void Dht::cleanup_entries(DhtTableType::mapped_type& entries)
282{
283        DhtTableType::mapped_type::iterator position = entries.begin();
284        while (position != entries.end()) {
285               
286                if (position->is_ttl_elapsed()) {
287                        // remove stale entry
288                        position = entries.erase(position);
289                       
290                } else {
291                        // move on otherwise
292                        ++position;
293                }
294        }
295}
296
297
298void Dht::answer_dht_request(const std::string& key, const NodeID& source)
299{
300    // get entries from table
301    const DhtTableType::mapped_type& entries = table[key];
302   
303    // need to convert value entries to strings
304        vector<std::string> values;
305        values.reserve(entries.size());
306        BOOST_FOREACH(const ValueEntry& entry, entries) {
307               
308                if (!entry.is_ttl_elapsed()) {
309                        values.push_back(entry.get_value());
310                }
311               
312        }
313   
314    // BRANCH: request comes from another node
315    //   ---> send answer message
316    if ( source != NodeID::UNSPECIFIED )
317    {
318        // create answer message
319        DhtMessage msg(DhtMessage::DhtAnswer, key, values);
320       
321        // * send answer *
322        node->sendMessage(msg, source, serviceID);
323    }
324   
325    // BRANCH: local request
326    //   ---> inform listeners directly (TODO code duplicates...)
327    else
328    {
329        logging_debug("DHT: Answering request for key '" << key << "' locally");
330
331        // * inform listeners *
332        if ( listener )
333        {
334            listener->handle_dht_answer(key, values);
335        }
336    }
337   
338   
339    // an empty key could have been created
340    //   ---> remove it
341    if ( entries.size() == 0 )
342    {
343        table.erase(key);
344    }
345}
346
347
348void Dht::send_meet_message(const std::string& key, const std::string& value)
349{
350    // send put&get message
351    DhtMessage msg(DhtMessage::DhtPutAndGet, key, value, MEET_DHT_TTL);
352   
353    handle_dht_message(msg, NodeID::UNSPECIFIED);
354   
355    // program timer for republish (or deletion)
356    Key_Value* kv = new Key_Value;
357    kv->key = key;
358    kv->value = value;
359   
360    SystemQueue::instance().scheduleEvent( 
361            SystemEvent( this, DhtRepublishEvent, kv),
362            MEET_REPUBLISH_INTERVAL * 1000 );
363}
364
365
366void Dht::meet_update_event(const std::string& key, const std::string& value)
367{
368    // get entries from table
369    DhtTableType::mapped_type& entries = meet_store[key];
370   
371    cleanup_entries(entries);
372   
373    // find right entry
374    BOOST_FOREACH(const ValueEntry& entry, entries) {
375        if (entry.get_value() == value) {
376               
377                // republish value
378                logging_debug("DHT: Republishing "
379                                "(" << key << ", " << entry.get_value() << ")");
380                send_meet_message(key, entry.get_value());
381        }
382    }
383   
384    // an empty key could have been created
385    //   ---> remove it
386    if ( entries.size() == 0 )
387    {
388        meet_store.erase(key);
389    }
390}
391
392void Dht::schedule_cleanup_event(bool reschedule)
393{
394        if (reschedule || !cleanup_running) {
395                SystemQueue::instance().scheduleEvent(
396                                SystemEvent(this, DhtCleanupEvent),
397                                CLEANUP_INTERVAL * 1000);
398                cleanup_running = true;
399        }
400}
401
402
403void Dht::print_dht()
404{
405        logging_debug("======== DHT ========");
406    for ( DhtTableType::iterator dht_it = table.begin(); dht_it != table.end(); dht_it++)
407    {
408        logging_debug("Key: " << dht_it->first);
409       
410        for ( DhtTableType::mapped_type::iterator value_it = dht_it->second.begin();
411                value_it != dht_it->second.end();
412                value_it++ )
413        {
414                logging_debug("--> " << value_it->get_value());
415        }
416       
417        logging_debug("- - - - -");
418    }
419   
420    logging_debug("======== [DHT] ========");
421}
422
423
424
425void Dht::onMessage(const ariba::DataMessage& msg, const ariba::NodeID& source,
426        const ariba::LinkID& lnk)
427{
428        logging_debug("DHT: Incoming message...");
429   
430    DhtMessage* mess = msg.getMessage()->convert<DhtMessage> ();
431   
432    // handle message
433    switch (mess->getType())
434    {
435        // BRANCH: Message is an Answer for our request
436        case DhtMessage::DhtAnswer:
437        {
438                logging_debug("DHT: Got answer for key '" << mess->getKey() << "'");
439           
440            BOOST_FOREACH(string str, mess->getValues())
441            {
442                logging_debug("--> Value: '" << str << "'");
443            }
444           
445            // * inform listeners *
446            if ( listener )
447            {
448                listener->handle_dht_answer(mess->getKey(), mess->getValues());
449            }
450
451            break;
452        }
453       
454        // BRANCH: Message is a Request
455        //   ---> route or handle
456        default:
457        {
458            handle_dht_message(*mess, source);
459           
460            break;
461        }
462    }
463
464    delete mess;
465}
466
467
468void Dht::handleSystemEvent( const SystemEvent& event )
469{
470       
471        if (event.getType() == DhtRepublishEvent) {
472                logging_debug("DHT: Meet republish event!");
473               
474                // republish meet entry
475                Key_Value* kv = event.getData<Key_Value>();
476                meet_update_event(kv->key, kv->value);
477                delete kv;
478               
479        } else if (event.getType() == DhtCleanupEvent) {
480                logging_debug("DHT: Cleanup event!");
481               
482                cleanup_table(table);
483                schedule_cleanup_event(true);
484        }
485}
486
487
488/**************
489 * ValueEntry *
490 **************/
491
492Dht::ValueEntry::ValueEntry(
493                const std::string& value,
494                uint16_t ttl) :
495        ttl(ttl),
496        last_update(second_clock<ptime>::universal_time()),
497        value(value)
498{
499}
500
501
502void Dht::ValueEntry::refresh() {
503        last_update = second_clock<ptime>::universal_time();
504}
505
506
507const std::string& Dht::ValueEntry::get_value() const {
508        return value;
509}
510
511uint16_t Dht::ValueEntry::get_age() const
512{
513    boost::posix_time::time_duration diff = 
514            second_clock<ptime>::universal_time() - last_update;
515   
516    return diff.total_seconds();
517}
518
519uint16_t Dht::ValueEntry::get_ttl() const {
520        return ttl;
521}
522
523void Dht::ValueEntry::set_ttl(uint16_t ttl) {
524        this->refresh();
525        this->ttl = ttl;
526}
527
528bool Dht::ValueEntry::is_ttl_elapsed() const {
529        // ttl == 0 signals infinite lifetime
530        if (ttl == 0) {
531                return false;
532        }
533       
534        return second_clock<ptime>::universal_time() >= 
535                        (last_update + boost::posix_time::seconds(ttl));
536}
537
538uint16_t Dht::ValueEntry::get_remaining_ttl() const
539{
540    if ( ttl == 0 )
541        return -1;
542   
543    if ( is_ttl_elapsed() )
544        return 0;
545   
546    boost::posix_time::time_duration diff = 
547            (last_update + boost::posix_time::seconds(ttl)) -
548            second_clock<ptime>::universal_time();
549   
550    return ttl - get_age();
551}
552
553}} /* namespace ariba_service::dht */
Note: See TracBrowser for help on using the repository browser.