Changeset 6266
- Timestamp:
- Sep 25, 2009, 2:30:33 PM (15 years ago)
- Location:
- source/ariba
- Files:
-
- 2 added
- 13 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/CommunicationListener.cpp
r3374 r6266 62 62 63 63 bool CommunicationListener::onLinkRequest(const NodeID& remote, 64 64 const DataMessage& msg) { 65 65 return true; 66 66 } 67 67 68 68 void CommunicationListener::onMessage(const DataMessage& msg, 69 69 const NodeID& remote, const LinkID& lnk) { 70 70 } 71 71 … … 73 73 return true; 74 74 } 75 76 void CommunicationListener::onKeyValue( const Data& key, const vector<Data>& value ) { 77 78 } 79 75 80 76 81 // --- extended message functionality --- -
source/ariba/CommunicationListener.h
r4075 r6266 86 86 87 87 // --- sniffing related method --- 88 88 89 virtual bool onEnableSideportListener(); 89 90 91 // --- dht functionality --- 92 93 virtual void onKeyValue( const Data& key, const vector<Data>& value ); 94 90 95 // --- extended message functionality --- 96 91 97 // virtual void onLinkQoSChanged(const LinkID& lnk, const NodeID& remote, 92 98 // const LinkProperties& prop); 93 99 94 100 // --- extended message functionality --- 101 95 102 // virtual void onMessageSent(seqnum_t seq_num, bool failed, 96 103 // const DataMessage& msg = DataMessage::UNSPECIFIED); 97 98 // --- dht functionality ---99 // virtual void onGetResponse( const Identifier<> id, const Message* msg );100 // virtual void onPutResponse( const Identifier<> id, const Message* msg );101 104 }; 102 105 -
source/ariba/Makefile.am
r5993 r6266 144 144 overlay/messages/JoinReply.cpp \ 145 145 overlay/messages/JoinRequest.cpp \ 146 overlay/messages/DHTMessage.cpp \ 146 147 overlay/messages/OverlayMsg.cpp 147 148 … … 149 150 overlay/messages/JoinReply.h \ 150 151 overlay/messages/JoinRequest.h \ 152 overlay/messages/DHTMessage.h\ 151 153 overlay/messages/OverlayMsg.h 152 154 -
source/ariba/Node.cpp
r5412 r6266 176 176 177 177 // service directory 178 /* 179 void Node::put(const Identifier<>& key, Message* value) {180 } 181 182 void Node::get(const Identifier<>& key) { 183 184 } 185 */ 178 179 void Node::put( const Data& key, const Data& value, uint16_t ttl ) { 180 base_overlay->dhtPut(key,value,ttl); 181 } 182 183 void Node::get( const Data& key, const ServiceID& sid ) { 184 base_overlay->dhtGet(key,sid); 185 } 186 186 187 187 // @see Module.h -
source/ariba/Node.h
r3578 r6266 282 282 bool unbind(CommunicationListener* listener, const ServiceID& sid); 283 283 284 // --- extension proposal: service directory -- 285 // main-idea: use this methods to register groups/rendevous points inside 286 // the base overlay via a distributed storage service. 287 // 288 //void put(const KeyID& key, Message* value); 289 //void get(const KeyID& key); 284 /** 285 * Adds a key value pair to the DHT 286 * 287 * @param key The key data 288 * @param value The value data 289 * @param ttl The time to live in seconds 290 */ 291 void put( const Data& key, const Data& value, uint16_t ttl ); 292 293 /** 294 * Queries for values stored in the DHT. Fires an communication event when 295 * values arrive. 296 * 297 * @param key The key data 298 * @param sid The service that is requesting the values 299 */ 300 void get( const Data& key, const ServiceID& sid ); 301 302 290 303 //------------------------------------------------------------------------- 291 304 // -
source/ariba/overlay/BaseOverlay.cpp
r6202 r6266 51 51 52 52 #include "ariba/overlay/messages/OverlayMsg.h" 53 #include "ariba/overlay/messages/DHTMessage.h" 53 54 #include "ariba/overlay/messages/JoinRequest.h" 54 55 #include "ariba/overlay/messages/JoinReply.h" … … 58 59 namespace ariba { 59 60 namespace overlay { 61 62 class ValueEntry { 63 public: 64 ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)), 65 last_change(time(NULL)), value(value.clone()) { 66 } 67 68 ~ValueEntry() { 69 value.release(); 70 } 71 72 void refresh() { 73 last_update = time(NULL); 74 } 75 76 void set_value( const Data& value ) { 77 this->value.release(); 78 this->value = value.clone(); 79 this->last_change = time(NULL); 80 this->last_update = time(NULL); 81 } 82 83 Data get_value() const { 84 return value; 85 } 86 87 uint16_t get_ttl() const { 88 return ttl; 89 } 90 91 void set_ttl( uint16_t ttl ) { 92 this->ttl = ttl; 93 } 94 95 bool is_ttl_elapsed() const { 96 // is persistent? yes-> always return false 97 if (ttl==0) return false; 98 99 // return true, if ttl is elapsed 100 return ( difftime( time(NULL), this->last_update ) >= ttl ); 101 } 102 103 private: 104 uint16_t ttl; 105 time_t last_update; 106 time_t last_change; 107 Data value; 108 }; 109 110 class DHTEntry { 111 public: 112 Data key; 113 vector<ValueEntry> values; 114 115 vector<Data> get_values() { 116 vector<Data> vect; 117 BOOST_FOREACH( ValueEntry& e, values ) 118 vect.push_back( e.get_value() ); 119 return vect; 120 } 121 }; 122 123 class DHT { 124 public: 125 typedef vector<DHTEntry> Entries; 126 typedef vector<ValueEntry> Values; 127 Entries entries; 128 129 static bool equals( const Data& lhs, const Data& rhs ) { 130 if (rhs.getLength()!=lhs.getLength()) return false; 131 for (int i=0; i<lhs.getLength()/8; i++) 132 if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false; 133 return true; 134 } 135 136 void put( const Data& key, const Data& value, uint16_t ttl = 0 ) { 137 138 // find entry 139 for (size_t i=0; i<entries.size(); i++) { 140 DHTEntry& entry = entries.at(i); 141 142 // check if key is already known 143 if ( equals(entry.key, key) ) { 144 145 // check if value is already in values list 146 for (size_t j=0; j<entry.values.size(); j++) { 147 // found value already? yes-> refresh ttl 148 if ( equals(entry.values[j].get_value(), value) ) { 149 entry.values[j].refresh(); 150 std::cout << "DHT: Republished value. Refreshing TTL." 151 << std::endl; 152 return; 153 } 154 } 155 156 // new value-> add to entry 157 std::cout << "DHT: Added value to " 158 << " key=" << key << " with value=" << value << std::endl; 159 entry.values.push_back( ValueEntry( value ) ); 160 entry.values.back().set_ttl(ttl); 161 return; 162 } 163 } 164 165 // key is unknown-> add key value pair 166 std::cout << "DHT: New key value pair " 167 << " key=" << key << " with value=" << value << std::endl; 168 169 // add new entry 170 entries.push_back( DHTEntry() ); 171 DHTEntry& entry = entries.back(); 172 entry.key = key.clone(); 173 entry.values.push_back( ValueEntry(value) ); 174 entry.values.back().set_ttl(ttl); 175 } 176 177 vector<Data> get( const Data& key ) { 178 // find entry 179 for (size_t i=0; i<entries.size(); i++) { 180 DHTEntry& entry = entries.at(i); 181 if ( equals(entry.key,key) ) 182 return entry.get_values(); 183 } 184 return vector<Data>(); 185 } 186 187 bool remove( const Data& key ) { 188 // find entry 189 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) { 190 DHTEntry& entry = *i; 191 192 // found? yes-> delete entry 193 if ( equals(entry.key, key) ) { 194 i = entries.erase(i); 195 return true; 196 } 197 } 198 return false; 199 } 200 201 bool remove( const Data& key, const Data& value ) { 202 // find entry 203 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) { 204 DHTEntry& entry = *i; 205 206 // found? yes-> try to find value 207 if ( equals(entry.key, key) ) { 208 for (Values::iterator j = entry.values.begin(); 209 j != entry.values.end(); j++) { 210 211 // value found? yes-> delete 212 if (equals(j->get_value(), value)) { 213 j = entry.values.erase(j); 214 return true; 215 } 216 } 217 } 218 } 219 return false; 220 } 221 222 void cleanup() { 223 // find entry 224 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) { 225 DHTEntry& entry = *i; 226 227 for (Values::iterator j = entry.values.begin(); 228 j != entry.values.end(); j++) { 229 230 // value found? yes-> delete 231 if (j->is_ttl_elapsed()) 232 j = entry.values.erase(j); 233 } 234 235 if (entry.values.size()==0) i = entries.erase(i); 236 } 237 } 238 }; 239 240 // ---------------------------------------------------------------------------- 60 241 61 242 /* ***************************************************************************** … … 545 726 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid), 546 727 sideport(&SideportListener::DEFAULT), started(false), counter(0) { 728 dht = new DHT(); 547 729 } 548 730 549 731 BaseOverlay::~BaseOverlay() { 732 delete dht; 550 733 } 551 734 … … 1582 1765 overlayMsg->addRouteRecord(nodeId); 1583 1766 1767 // handle dht messages (do not route) 1768 if (overlayMsg->isDHTMessage()) 1769 return handleDHTMessage(overlayMsg); 1770 1584 1771 // handle signaling messages (do not route!) 1585 1772 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart && … … 1601 1788 return true; 1602 1789 } 1790 1791 // handle DHT response messages 1792 if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) { 1793 bool ret = handleDHTMessage(overlayMsg); 1794 delete overlayMsg; 1795 return ret; 1796 } 1797 1603 1798 1604 1799 // handle base overlay message … … 1699 1894 } 1700 1895 1896 1897 // ---------------------------------------------------------------------------- 1898 1899 1900 1901 /// stabilize DHT state 1902 void BaseOverlay::stabilizeDHT() { 1903 1904 } 1905 1906 // handle DHT messages 1907 bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) { 1908 1909 // decapsulate message 1910 logging_debug("received DHT message"); 1911 DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>(); 1912 1913 // handle DHT data message 1914 if (msg->getType()==OverlayMsg::typeDHTData) { 1915 const ServiceID& service = msg->getService(); 1916 logging_debug( "Received DHT data for service " << service.toString() ); 1917 1918 // delegate data message 1919 getListener(service)->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() ); 1920 return true; 1921 } 1922 1923 // route message to closest node 1924 if (!overlayInterface->isClosestNodeTo(msg->getDestinationNode())) { 1925 logging_debug("Routing DHT message to closest node " 1926 << " from " << msg->getSourceNode() 1927 << " to " << msg->getDestinationNode() 1928 ); 1929 route( msg ); 1930 delete msg; 1931 return true; 1932 } 1933 1934 // now, we are the closest node... 1935 switch (msg->getType()) { 1936 case OverlayMsg::typeDHTPut: { 1937 BOOST_FOREACH( Data value, dhtMsg->getValues() ) 1938 dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() ); 1939 break; 1940 } 1941 1942 case OverlayMsg::typeDHTGet: { 1943 vector<Data> vect = dht->get(dhtMsg->getKey()); 1944 OverlayMsg omsg(*msg); 1945 omsg.swapRoles(); 1946 omsg.setType(OverlayMsg::typeDHTData); 1947 DHTMessage dhtmsg(dhtMsg->getKey(), vect); 1948 omsg.encapsulate(&dhtmsg); 1949 this->send(&omsg, omsg.getDestinationNode()); 1950 break; 1951 } 1952 1953 case OverlayMsg::typeDHTRemove: { 1954 if (dhtMsg->hasValues()) { 1955 BOOST_FOREACH( Data value, dhtMsg->getValues() ) 1956 dht->remove(dhtMsg->getKey(), value ); 1957 } else 1958 dht->remove( dhtMsg->getKey() ); 1959 break; 1960 } 1961 1962 default: 1963 logging_error("DHT Message type unknown."); 1964 return false; 1965 } 1966 delete msg; 1967 return true; 1968 } 1969 1970 /// put a value to the DHT with a ttl given in seconds 1971 void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl ) { 1972 // calculate hash 1973 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8); 1974 OverlayMsg msg(OverlayMsg::typeDHTPut); 1975 DHTMessage dhtmsg(key,value); 1976 dhtmsg.setTTL(ttl); 1977 msg.setDestinationNode(dest); 1978 msg.encapsulate( &dhtmsg ); 1979 send(&msg, dest); 1980 } 1981 1982 /// removes a key value pair from the DHT 1983 void BaseOverlay::dhtRemove( const Data& key, const Data& value ) { 1984 // calculate hash 1985 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8); 1986 OverlayMsg msg(OverlayMsg::typeDHTRemove); 1987 DHTMessage dhtmsg(key,value); 1988 msg.setDestinationNode(dest); 1989 msg.encapsulate( &dhtmsg ); 1990 send(&msg, dest); 1991 } 1992 1993 /// removes all data stored at the given key 1994 void BaseOverlay::dhtRemove( const Data& key ) { 1995 // calculate hash 1996 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8); 1997 OverlayMsg msg(OverlayMsg::typeDHTRemove); 1998 DHTMessage dhtmsg(key); 1999 msg.setDestinationNode(dest); 2000 msg.encapsulate( &dhtmsg ); 2001 send(&msg, dest); 2002 } 2003 2004 /// requests data stored using key 2005 void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) { 2006 // calculate hash 2007 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8); 2008 OverlayMsg msg(OverlayMsg::typeDHTRemove); 2009 DHTMessage dhtmsg(key); 2010 msg.setDestinationNode(dest); 2011 msg.setService(service); 2012 msg.encapsulate( &dhtmsg ); 2013 send(&msg, dest); 2014 } 2015 2016 1701 2017 }} // namespace ariba, overlay -
source/ariba/overlay/BaseOverlay.h
r5916 r6266 122 122 class LinkDescriptor; 123 123 class OverlayMsg; 124 class DHT; 124 125 125 126 class BaseOverlay: public MessageReceiver, … … 284 285 void leaveSpoVNet(); 285 286 287 /// put a value to the DHT with a ttl given in seconds 288 void dhtPut( const Data& key, const Data& value, int ttl = 0); 289 290 /// removes a key value pair from the DHT 291 void dhtRemove( const Data& key, const Data& value ); 292 293 /// removes all data stored at the given key 294 void dhtRemove( const Data& key ); 295 296 /// requests data stored using key 297 void dhtGet( const Data& key, const ServiceID& service ); 298 286 299 protected: 287 288 300 /** 289 301 * @see ariba::communication::CommunicationEvents.h … … 397 409 bool handleJoinReply( OverlayMsg* msg, const LinkID& bcLink ); 398 410 411 // handle DHT messages 412 bool handleDHTMessage( OverlayMsg* msg ); 413 399 414 // handle link messages 400 415 bool handleLinkRequest( OverlayMsg* msg, LinkDescriptor* ld ); … … 403 418 bool handleLinkDirect( OverlayMsg* msg, LinkDescriptor* ld ); 404 419 bool handleLinkAlive( OverlayMsg* msg, LinkDescriptor* ld ); 420 405 421 406 422 // link state handling ----------------------------------------------------- … … 487 503 bool ignore_down = false ); 488 504 505 // distributed hashtable handling ------------------------------------------ 506 507 DHT* dht; 508 509 void stabilizeDHT(); 510 489 511 // misc -------------------------------------------------------------------- 490 512 -
source/ariba/overlay/messages/OverlayMsg.h
r5902 r6266 88 88 typeLinkDirect = 0x34, ///< direct connection has been established 89 89 typeLinkAlive = 0x35, ///< keep-alive message 90 91 // DHT routed messages 92 maskDHT = 0x40, ///< bit mask for dht messages 93 typeDHTPut = 0x41, ///< DHT put operation 94 typeDHTGet = 0x42, ///< DHT get operation 95 typeDHTRemove = 0x43, ///< DHT remove operation 96 97 /// DHT response messages 98 maskDHTResponse = 0x50, ///< bit mask for dht responses 99 typeDHTData = 0x51, ///< DHT get data 90 100 91 101 // topology signaling … … 179 189 } 180 190 181 182 191 bool containsSourceEndpoint() const { 183 192 return (flags & 0x20)!=0; … … 186 195 void setContainsSourceEndpoint(bool contains_endpoint) { 187 196 if (contains_endpoint) flags |= 0x20; else flags &= ~0x20; 197 } 198 199 bool isDHTMessage() const { 200 return hasTypeMask(maskDHT); 188 201 } 189 202 -
source/ariba/overlay/modules/OverlayInterface.h
r5870 r6266 116 116 virtual const EndpointDescriptor& resolveNode(const NodeID& node) = 0; 117 117 118 119 /** 120 * Returns true if this is the closest node to the given node 121 * identifier. 122 * 123 * @param node The node identifier to compare with 124 * @return True if this is the closest node to the given node identifier 125 */ 126 virtual bool isClosestNodeTo( const NodeID& node ) = 0; 127 118 128 /** 119 129 * Returns the nodes known to this overlay. -
source/ariba/overlay/modules/chord/Chord.cpp
r6198 r6266 181 181 if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); 182 182 return baseoverlay.getEndpointDescriptor(item->info); 183 } 184 185 /// @see OverlayInterface.h 186 bool Chord::isClosestNodeTo( const NodeID& node ) { 187 return table->is_closest_to(node); 183 188 } 184 189 -
source/ariba/overlay/modules/chord/Chord.h
r5876 r6266 123 123 124 124 /// @see OverlayInterface.h 125 virtual bool isClosestNodeTo( const NodeID& node ); 126 127 /// @see OverlayInterface.h 125 128 virtual NodeList getKnownNodes(bool deep = true) const; 126 129 -
source/ariba/overlay/modules/onehop/OneHop.cpp
r5743 r6266 79 79 } 80 80 81 82 /// @see OverlayInterface.h 83 bool OneHop::isClosestNodeTo( const NodeID& node ) { 84 throw "NOT IMPLEMENTED!"; 85 return false; 86 } 87 81 88 void OneHop::routeMessage(const NodeID& destnode, Message* msg){ 82 89 -
source/ariba/overlay/modules/onehop/OneHop.h
r5624 r6266 81 81 82 82 /// @see OverlayInterface.h 83 virtual bool isClosestNodeTo( const NodeID& node ); 84 85 /// @see OverlayInterface.h 83 86 virtual const LinkID& getNextLinkId( const NodeID& id ) const; 84 87
Note:
See TracChangeset
for help on using the changeset viewer.