1 | // [License] |
---|
2 | // The Ariba-Underlay Copyright |
---|
3 | // |
---|
4 | // Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH) |
---|
5 | // |
---|
6 | // Institute of Telematics |
---|
7 | // UniversitÀt Karlsruhe (TH) |
---|
8 | // Zirkel 2, 76128 Karlsruhe |
---|
9 | // Germany |
---|
10 | // |
---|
11 | // Redistribution and use in source and binary forms, with or without |
---|
12 | // modification, are permitted provided that the following conditions are |
---|
13 | // met: |
---|
14 | // |
---|
15 | // 1. Redistributions of source code must retain the above copyright |
---|
16 | // notice, this list of conditions and the following disclaimer. |
---|
17 | // 2. Redistributions in binary form must reproduce the above copyright |
---|
18 | // notice, this list of conditions and the following disclaimer in the |
---|
19 | // documentation and/or other materials provided with the distribution. |
---|
20 | // |
---|
21 | // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND |
---|
22 | // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
---|
23 | // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
---|
24 | // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR |
---|
25 | // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
---|
26 | // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
---|
27 | // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
---|
28 | // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
---|
29 | // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
---|
30 | // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
---|
31 | // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
32 | // |
---|
33 | // The views and conclusions contained in the software and documentation |
---|
34 | // are those of the authors and should not be interpreted as representing |
---|
35 | // official policies, either expressed or implied, of the Institute of |
---|
36 | // Telematics. |
---|
37 | // [License] |
---|
38 | |
---|
39 | #include "ariba/overlay/BaseOverlay.h" |
---|
40 | |
---|
41 | #include "Chord.h" |
---|
42 | #include "messages/ChordMessage.h" |
---|
43 | #include "messages/Discovery.h" |
---|
44 | |
---|
45 | #include "detail/chord_routing_table.hpp" |
---|
46 | |
---|
47 | namespace ariba { |
---|
48 | namespace overlay { |
---|
49 | |
---|
50 | typedef chord_routing_table::item route_item; |
---|
51 | |
---|
52 | use_logging_cpp( Chord ); |
---|
53 | |
---|
54 | Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, |
---|
55 | OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param) : |
---|
56 | OverlayInterface(_baseoverlay, _nodeid, _eventsReceiver, param) { |
---|
57 | |
---|
58 | // create routing table |
---|
59 | this->table = new chord_routing_table(_nodeid, 2); |
---|
60 | orphan_removal_counter = 0; |
---|
61 | stabilize_counter = 0; |
---|
62 | stabilize_finger = 0; |
---|
63 | bootstrapLink = LinkID::UNSPECIFIED; |
---|
64 | } |
---|
65 | |
---|
66 | Chord::~Chord() { |
---|
67 | |
---|
68 | // delete routing table |
---|
69 | delete table; |
---|
70 | } |
---|
71 | |
---|
72 | /// helper: sets up a link using the base overlay |
---|
73 | LinkID Chord::setup(const EndpointDescriptor& endp, const NodeID& node) { |
---|
74 | |
---|
75 | logging_debug("request to setup link to " << endp.toString() ); |
---|
76 | |
---|
77 | for (size_t i=0; i<pending.size(); i++) |
---|
78 | if (pending[i]==node) return LinkID::UNSPECIFIED; |
---|
79 | pending.push_back(node); |
---|
80 | |
---|
81 | // establish link via base overlay |
---|
82 | return baseoverlay.establishLink(endp, node, OverlayInterface::OVERLAY_SERVICE_ID); |
---|
83 | } |
---|
84 | |
---|
85 | /// helper: sends a message using the "base overlay" |
---|
86 | seqnum_t Chord::send(Message* msg, const LinkID& link) { |
---|
87 | if (link.isUnspecified()) return 0; |
---|
88 | return baseoverlay.sendMessage(msg, link); |
---|
89 | } |
---|
90 | |
---|
91 | /// sends a discovery message |
---|
92 | void Chord::send_discovery_to(const NodeID& destination, int ttl) { |
---|
93 | // logging_debug("Initiating discovery of " << destination.toString() ); |
---|
94 | Message msg; |
---|
95 | ChordMessage cmsg(ChordMessage::discovery, nodeid, destination); |
---|
96 | Discovery dmsg; |
---|
97 | dmsg.setSourceEndpoint(&baseoverlay.getEndpointDescriptor()); |
---|
98 | dmsg.setFollowType(Discovery::normal); |
---|
99 | dmsg.setTTL((uint8_t) ttl); |
---|
100 | cmsg.encapsulate(&dmsg); |
---|
101 | msg.encapsulate(&cmsg); |
---|
102 | this->onMessage(&msg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); |
---|
103 | } |
---|
104 | |
---|
105 | void Chord::createOverlay() { |
---|
106 | } |
---|
107 | |
---|
108 | void Chord::deleteOverlay() { |
---|
109 | |
---|
110 | } |
---|
111 | |
---|
112 | void Chord::joinOverlay(const EndpointDescriptor& boot) { |
---|
113 | logging_info( "joining Chord overlay structure through end-point " << |
---|
114 | (boot == EndpointDescriptor::UNSPECIFIED ? |
---|
115 | "local" : boot.toString()) ); |
---|
116 | |
---|
117 | // initiator? no->setup first link |
---|
118 | if (!(boot == EndpointDescriptor::UNSPECIFIED)) |
---|
119 | bootstrapLink = setup(boot); |
---|
120 | |
---|
121 | // timer for stabilization management |
---|
122 | Timer::setInterval(2500); |
---|
123 | Timer::start(); |
---|
124 | } |
---|
125 | |
---|
126 | void Chord::leaveOverlay() { |
---|
127 | Timer::stop(); |
---|
128 | for (size_t i = 0; i < table->size(); i++) { |
---|
129 | route_item* it = (*table)[i]; |
---|
130 | ChordMessage msg(ChordMessage::leave, nodeid, it->id); |
---|
131 | send(&msg,it->info); |
---|
132 | } |
---|
133 | } |
---|
134 | |
---|
135 | const EndpointDescriptor& Chord::resolveNode(const NodeID& node) { |
---|
136 | const route_item* item = table->get(node); |
---|
137 | if (item == NULL || item->info.isUnspecified()) return EndpointDescriptor::UNSPECIFIED; |
---|
138 | return baseoverlay.getEndpointDescriptor(item->info); |
---|
139 | } |
---|
140 | |
---|
141 | void Chord::routeMessage(const NodeID& destnode, Message* msg) { |
---|
142 | // get next hop |
---|
143 | const route_item* item = table->get_next_hop(destnode); |
---|
144 | |
---|
145 | // message for this node? yes-> delegate to base overlay |
---|
146 | if (item->id == nodeid || destnode == nodeid) |
---|
147 | baseoverlay.incomingRouteMessage( msg, LinkID::UNSPECIFIED, nodeid ); |
---|
148 | |
---|
149 | else { // no-> send to next hop |
---|
150 | ChordMessage cmsg(ChordMessage::route, nodeid, destnode); |
---|
151 | cmsg.encapsulate(msg); |
---|
152 | send(&cmsg, item->info); |
---|
153 | } |
---|
154 | } |
---|
155 | |
---|
156 | /// @see OverlayInterface.h |
---|
157 | void Chord::routeMessage(const NodeID& node, const LinkID& link, Message* msg) { |
---|
158 | logging_debug("Redirect over Chord to node id=" << node.toString() |
---|
159 | << " link id=" << link.toString() ); |
---|
160 | ChordMessage cmsg(ChordMessage::route, nodeid, node); |
---|
161 | cmsg.encapsulate(msg); |
---|
162 | send(&cmsg, link); |
---|
163 | } |
---|
164 | |
---|
165 | /// @see OverlayInterface.h |
---|
166 | const LinkID& Chord::getNextLinkId( const NodeID& id ) const { |
---|
167 | // get next hop |
---|
168 | const route_item* item = table->get_next_hop(id); |
---|
169 | |
---|
170 | // returns a unspecified id when this is itself |
---|
171 | if (item == NULL || item->id == nodeid) |
---|
172 | return LinkID::UNSPECIFIED; |
---|
173 | |
---|
174 | /// return routing info |
---|
175 | return item->info; |
---|
176 | } |
---|
177 | |
---|
178 | OverlayInterface::NodeList Chord::getKnownNodes() const { |
---|
179 | OverlayInterface::NodeList nodelist; |
---|
180 | for (size_t i = 0; i < table->size(); i++) |
---|
181 | if ((*table)[i]->ref_count != 0 |
---|
182 | && !(*table)[i]->info.isUnspecified()) |
---|
183 | nodelist.push_back((*table)[i]->id); |
---|
184 | return nodelist; |
---|
185 | } |
---|
186 | |
---|
187 | /// @see CommunicationListener.h |
---|
188 | /// @see OverlayInterface.h |
---|
189 | void Chord::onLinkUp(const LinkID& lnk, const NodeID& remote) { |
---|
190 | logging_debug("link_up: link=" << lnk.toString() << " remote=" << |
---|
191 | remote.toString() ); |
---|
192 | for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++) |
---|
193 | if (*i == remote) { |
---|
194 | pending.erase(i); |
---|
195 | break; |
---|
196 | } |
---|
197 | route_item* item = table->insert(remote); |
---|
198 | |
---|
199 | // item added to routing table? |
---|
200 | if (item != NULL) { // yes-> add to routing table |
---|
201 | logging_info("new routing neighbor: " << remote.toString() |
---|
202 | << " with link " << lnk.toString()); |
---|
203 | item->info = lnk; |
---|
204 | } else { // no-> add orphan entry to routing table |
---|
205 | logging_info("new orphan: " << remote.toString() |
---|
206 | << " with link " << lnk.toString()); |
---|
207 | table->insert_orphan(remote)->info = lnk; |
---|
208 | } |
---|
209 | |
---|
210 | if (!bootstrapLink.isUnspecified() && lnk == bootstrapLink) { |
---|
211 | send_discovery_to(nodeid); |
---|
212 | bootstrapLink = LinkID::UNSPECIFIED; |
---|
213 | } |
---|
214 | } |
---|
215 | |
---|
216 | /// @see CommunicationListener.h or @see OverlayInterface.h |
---|
217 | void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { |
---|
218 | logging_debug("link_down: link=" << lnk.toString() << " remote=" << |
---|
219 | remote.toString() ); |
---|
220 | |
---|
221 | // remove link from routing table |
---|
222 | route_item* item = table->get(remote); |
---|
223 | if (item!=NULL) item->info = LinkID::UNSPECIFIED; |
---|
224 | table->remove(remote); |
---|
225 | } |
---|
226 | |
---|
227 | /// @see CommunicationListener.h |
---|
228 | /// @see OverlayInterface.h |
---|
229 | void Chord::onMessage(const DataMessage& msg, const NodeID& remote, |
---|
230 | const LinkID& link) { |
---|
231 | |
---|
232 | // decode message |
---|
233 | typedef ChordMessage M; |
---|
234 | M* m = msg.getMessage()->convert<ChordMessage> (); |
---|
235 | if (m == NULL) return; |
---|
236 | |
---|
237 | // handle messages |
---|
238 | switch (m->getType()) { |
---|
239 | |
---|
240 | // invalid message |
---|
241 | case M::invalid: |
---|
242 | break; |
---|
243 | |
---|
244 | // route message with payload |
---|
245 | case M::route: { |
---|
246 | // find next hop |
---|
247 | const route_item* item = table->get_next_hop(m->getDestination()); |
---|
248 | |
---|
249 | // next hop == myself? |
---|
250 | if (m->getDestination() == nodeid) { // yes-> route to base overlay |
---|
251 | logging_debug("Send message to baseoverlay"); |
---|
252 | baseoverlay.incomingRouteMessage( m, item->info, remote ); |
---|
253 | } |
---|
254 | // no-> route to next hop |
---|
255 | else { |
---|
256 | logging_debug("Route chord message to " |
---|
257 | << item->id.toString() << " (destination=" << m->getDestination() << ")"); |
---|
258 | send(m, item->info); |
---|
259 | } |
---|
260 | break; |
---|
261 | } |
---|
262 | |
---|
263 | // discovery request |
---|
264 | case M::discovery: { |
---|
265 | // decapsulate message |
---|
266 | Discovery* dmsg = m->decapsulate<Discovery> (); |
---|
267 | logging_debug("received discovery message with" |
---|
268 | << " dest=" << m->getDestination().toString() |
---|
269 | << " ttl=" << (int)dmsg->getTTL() |
---|
270 | << " type=" << (int)dmsg->getFollowType() |
---|
271 | ); |
---|
272 | |
---|
273 | // check if source node can be added to routing table and setup link |
---|
274 | if (m->getSource() != nodeid && table->is_insertable(m->getSource())) |
---|
275 | setup(*dmsg->getSourceEndpoint(), m->getSource() ); |
---|
276 | |
---|
277 | // delegate discovery message |
---|
278 | switch (dmsg->getFollowType()) { |
---|
279 | |
---|
280 | // normal: route discovery message like every other message |
---|
281 | case Discovery::normal: |
---|
282 | // closest node? yes-> split to follow successor and predecessor |
---|
283 | if (table->is_closest_to(m->getDestination())) { |
---|
284 | |
---|
285 | if (table->get_successor() != NULL) { |
---|
286 | // send successor message |
---|
287 | ChordMessage cmsg_s(*m); |
---|
288 | Discovery dmsg_s(*dmsg); |
---|
289 | dmsg_s.setFollowType(Discovery::successor); |
---|
290 | cmsg_s.encapsulate(&dmsg_s); |
---|
291 | route_item* succ_item = table->get(*table->get_successor()); |
---|
292 | logging_debug("split: routing discovery message to successor " |
---|
293 | << succ_item->id.toString() ); |
---|
294 | send(&cmsg_s, succ_item->info); |
---|
295 | } |
---|
296 | |
---|
297 | // send predecessor message |
---|
298 | if (table->get_predesessor() != NULL) { |
---|
299 | ChordMessage cmsg_p(*m); |
---|
300 | Discovery dmsg_p(*dmsg); |
---|
301 | dmsg_p.setFollowType(Discovery::predecessor); |
---|
302 | cmsg_p.encapsulate(&dmsg_p); |
---|
303 | route_item* pred_item = table->get( |
---|
304 | *table->get_predesessor()); |
---|
305 | logging_debug("split: routing discovery message to predecessor " |
---|
306 | << pred_item->id.toString() ); |
---|
307 | send(&cmsg_p, pred_item->info); |
---|
308 | } |
---|
309 | } |
---|
310 | // no-> route message |
---|
311 | else { |
---|
312 | // find next hop |
---|
313 | const route_item* item = table->get_next_hop( |
---|
314 | m->getDestination()); |
---|
315 | if (item->id == nodeid) break; |
---|
316 | logging_debug("routing discovery message to " << |
---|
317 | item->id.toString() ); |
---|
318 | send(m, item->info); |
---|
319 | } |
---|
320 | break; |
---|
321 | |
---|
322 | // successor mode: follow the successor until TTL is zero |
---|
323 | case Discovery::successor: |
---|
324 | case Discovery::predecessor: { |
---|
325 | // time to live ended? yes-> stop routing |
---|
326 | if (dmsg->getTTL() == 0) break; |
---|
327 | |
---|
328 | // decrease time-to-live |
---|
329 | dmsg->setTTL(dmsg->getTTL() - 1); |
---|
330 | |
---|
331 | const route_item* item = NULL; |
---|
332 | if (dmsg->getFollowType() == Discovery::successor && |
---|
333 | table->get_successor() != NULL) { |
---|
334 | item = table->get(*table->get_successor()); |
---|
335 | } else { |
---|
336 | if (table->get_predesessor()!=NULL) |
---|
337 | item = table->get(*table->get_predesessor()); |
---|
338 | } |
---|
339 | if (item == NULL) break; |
---|
340 | logging_debug("routing discovery message to succ/pred " |
---|
341 | << item->id.toString() ); |
---|
342 | ChordMessage cmsg(*m); |
---|
343 | cmsg.encapsulate(dmsg); |
---|
344 | send(&cmsg, item->info); |
---|
345 | break; |
---|
346 | } |
---|
347 | } |
---|
348 | break; |
---|
349 | } |
---|
350 | |
---|
351 | // leave |
---|
352 | case M::leave: { |
---|
353 | if (link!=LinkID::UNSPECIFIED) { |
---|
354 | route_item* item = table->get(remote); |
---|
355 | if (item!=NULL) item->info = LinkID::UNSPECIFIED; |
---|
356 | table->remove(remote); |
---|
357 | baseoverlay.dropLink(link); |
---|
358 | } |
---|
359 | break; |
---|
360 | } |
---|
361 | } |
---|
362 | } |
---|
363 | |
---|
364 | void Chord::eventFunction() { |
---|
365 | stabilize_counter++; |
---|
366 | if (stabilize_counter == 3) { |
---|
367 | pending.clear(); |
---|
368 | size_t numNeighbors = 0; |
---|
369 | for (size_t i = 0; i < table->size(); i++) { |
---|
370 | route_item* it = (*table)[i]; |
---|
371 | if (it->ref_count != 0 && !it->info.isUnspecified()) numNeighbors++; |
---|
372 | } |
---|
373 | logging_info("Running stabilization: #links=" |
---|
374 | << table->size() << " #neighbors=" << numNeighbors ); |
---|
375 | stabilize_counter = 0; |
---|
376 | stabilize_finger = ((stabilize_finger+1) % table->get_finger_table_size() ); |
---|
377 | logging_debug("Sending discovery message to my neighbors and fingers"); |
---|
378 | const NodeID& disc1 = nodeid; |
---|
379 | const NodeID& disc2 = table->get_finger_table(stabilize_finger).get_compare().get_center(); |
---|
380 | send_discovery_to(disc1); |
---|
381 | if (disc1 != disc2) send_discovery_to(disc2); |
---|
382 | orphan_removal_counter++; |
---|
383 | if (orphan_removal_counter == 2) { |
---|
384 | logging_info("Running orphan removal"); |
---|
385 | orphan_removal_counter = 0; |
---|
386 | for (size_t i = 0; i < table->size(); i++) { |
---|
387 | route_item* it = (*table)[i]; |
---|
388 | if (it->ref_count == 0 && !it->info.isUnspecified()) { |
---|
389 | logging_info("Dropping orphaned link " << it->info.toString() << " to " << it->id.toString()); |
---|
390 | baseoverlay.dropLink(it->info); |
---|
391 | it->info = LinkID::UNSPECIFIED; |
---|
392 | } |
---|
393 | } |
---|
394 | } |
---|
395 | } |
---|
396 | logging_debug("--- chord routing information ----------------------------------"); |
---|
397 | logging_debug("predecessor: " << (table->get_predesessor()==NULL? "<none>" : |
---|
398 | table->get_predesessor()->toString()) ); |
---|
399 | logging_debug("node_id : " << nodeid.toString() ); |
---|
400 | logging_debug("successor : " << (table->get_successor()==NULL? "<none>" : |
---|
401 | table->get_successor()->toString())); |
---|
402 | logging_debug("----------------------------------------------------------------"); |
---|
403 | } |
---|
404 | |
---|
405 | }} // namespace ariba, overlay |
---|