1 | // [License]
2 | // The Ariba-Underlay Copyright
3 | //
4 | // Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5 | //
6 | // Institute of Telematics
7 | // UniversitÀt Karlsruhe (TH)
8 | // Zirkel 2, 76128 Karlsruhe
9 | // Germany
10 | //
11 | // Redistribution and use in source and binary forms, with or without
12 | // modification, are permitted provided that the following conditions are
13 | // met:
14 | //
15 | // 1. Redistributions of source code must retain the above copyright
16 | // notice, this list of conditions and the following disclaimer.
17 | // 2. Redistributions in binary form must reproduce the above copyright
18 | // notice, this list of conditions and the following disclaimer in the
19 | // documentation and/or other materials provided with the distribution.
20 | //
32 | //
33 | // The views and conclusions contained in the software and documentation
34 | // are those of the authors and should not be interpreted as representing
35 | // official policies, either expressed or implied, of the Institute of
36 | // Telematics.
37 | // [License]
38 |
39 | #include "BaseOverlay.h"
40 |
41 | #include <sstream>
42 | #include <iostream>
43 | #include <string>
44 | #include <boost/foreach.hpp>
45 |
46 | #include "ariba/NodeListener.h"
47 | #include "ariba/CommunicationListener.h"
48 | #include "ariba/SideportListener.h"
49 |
50 | #include "ariba/overlay/LinkDescriptor.h"
51 |
52 | #include "ariba/overlay/messages/OverlayMsg.h"
53 | #include "ariba/overlay/messages/DHTMessage.h"
54 | #include "ariba/overlay/messages/JoinRequest.h"
55 | #include "ariba/overlay/messages/JoinReply.h"
56 |
57 | #include "ariba/utility/visual/OvlVis.h"
58 | #include "ariba/utility/visual/DddVis.h"
59 | #include "ariba/utility/visual/ServerVis.h"
60 |
61 | namespace ariba {
62 | namespace overlay {
63 |
64 | #define visual ariba::utility::DddVis::instance()
65 | #define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
66 | #define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
67 |
68 | class ValueEntry {
69 | public:
70 | ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),
71 | last_change(time(NULL)), value(value.clone()) {
72 | }
73 |
74 | ValueEntry( const ValueEntry& value ) :
75 | ttl(value.ttl), last_update(value.last_update),
76 | last_change(value.last_change), value(value.value.clone()) {
77 |
78 | }
79 |
80 | ~ValueEntry() {
81 | value.release();
82 | }
83 |
84 | void refresh() {
85 | last_update = time(NULL);
86 | }
87 |
88 | void set_value( const Data& value ) {
89 | this->value.release();
90 | this->value = value.clone();
91 | this->last_change = time(NULL);
92 | this->last_update = time(NULL);
93 | }
94 |
95 | Data get_value() const {
96 | return value;
97 | }
98 |
99 | uint16_t get_ttl() const {
100 | return ttl;
101 | }
102 |
103 | void set_ttl( uint16_t ttl ) {
104 | this->ttl = ttl;
105 | }
106 |
107 | bool is_ttl_elapsed() const {
108 | // is persistent? yes-> always return false
109 | if (ttl==0) return false;
110 |
111 | // return true, if ttl is elapsed
112 | return ( difftime( time(NULL), this->last_update ) >= ttl );
113 | }
114 |
115 | private:
116 | uint16_t ttl;
117 | time_t last_update;
118 | time_t last_change;
119 | Data value;
120 | };
121 |
122 | class DHTEntry {
123 | public:
124 | Data key;
125 | vector<ValueEntry> values;
126 |
127 | vector<Data> get_values() {
128 | vector<Data> vect;
129 | BOOST_FOREACH( ValueEntry& e, values )
130 | vect.push_back( e.get_value() );
131 | return vect;
132 | }
133 |
134 | void erase_expired_entries() {
135 | for (vector<ValueEntry>::iterator i = values.begin();
136 | i != values.end(); i++ )
137 | if (i->is_ttl_elapsed()) i = values.erase(i);
138 | }
139 | };
140 |
141 | class DHT {
142 | public:
143 | typedef vector<DHTEntry> Entries;
144 | typedef vector<ValueEntry> Values;
145 | Entries entries;
146 | static const bool verbose = false;
147 |
148 | static bool equals( const Data& lhs, const Data& rhs ) {
149 | if (rhs.getLength()!=lhs.getLength()) return false;
150 | for (int i=0; i<lhs.getLength()/8; i++)
151 | if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;
152 | return true;
153 | }
154 |
155 | void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {
156 |
157 | // find entry
158 | for (size_t i=0; i<entries.size(); i++) {
159 | DHTEntry& entry = entries.at(i);
160 |
161 | // check if key is already known
162 | if ( equals(entry.key, key) ) {
163 |
164 | // check if value is already in values list
165 | for (size_t j=0; j<entry.values.size(); j++) {
166 | // found value already? yes-> refresh ttl
167 | if ( equals(entry.values[j].get_value(), value) ) {
168 | entry.values[j].refresh();
169 | if (verbose)
170 | std::cout << "DHT: Republished value. Refreshing value timestamp."
171 | << std::endl;
172 | return;
173 | }
174 | }
175 |
176 | // new value-> add to entry
177 | if (verbose)
178 | std::cout << "DHT: Added value to "
179 | << " key=" << key << " with value=" << value << std::endl;
180 | entry.values.push_back( ValueEntry( value ) );
181 | entry.values.back().set_ttl(ttl);
182 | return;
183 | }
184 | }
185 |
186 | // key is unknown-> add key value pair
187 | if (verbose)
188 | std::cout << "DHT: New key value pair "
189 | << " key=" << key << " with value=" << value << std::endl;
190 |
191 | // add new entry
192 | entries.push_back( DHTEntry() );
193 | DHTEntry& entry = entries.back();
194 | entry.key = key.clone();
195 | entry.values.push_back( ValueEntry(value) );
196 | entry.values.back().set_ttl(ttl);
197 | }
198 |
199 | vector<Data> get( const Data& key ) {
200 | // find entry
201 | for (size_t i=0; i<entries.size(); i++) {
202 | DHTEntry& entry = entries.at(i);
203 | if ( equals(entry.key,key) )
204 | return entry.get_values();
205 | }
206 | return vector<Data>();
207 | }
208 |
209 | bool remove( const Data& key ) {
210 | // find entry
211 | for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
212 | DHTEntry& entry = *i;
213 |
214 | // found? yes-> delete entry
215 | if ( equals(entry.key, key) ) {
216 | i = entries.erase(i);
217 | return true;
218 | }
219 | }
220 | return false;
221 | }
222 |
223 | bool remove( const Data& key, const Data& value ) {
224 | // find entry
225 | for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
226 | DHTEntry& entry = *i;
227 |
228 | // found? yes-> try to find value
229 | if ( equals(entry.key, key) ) {
230 | for (Values::iterator j = entry.values.begin();
231 | j != entry.values.end(); j++) {
232 |
233 | // value found? yes-> delete
234 | if (equals(j->get_value(), value)) {
235 | j = entry.values.erase(j);
236 | return true;
237 | }
238 | }
239 | }
240 | }
241 | return false;
242 | }
243 |
244 | void cleanup() {
245 | // find entry
246 | for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
247 | DHTEntry& entry = *i;
248 |
249 | for (Values::iterator j = entry.values.begin();
250 | j != entry.values.end(); j++) {
251 |
252 | // value found? yes-> delete
253 | if (j->is_ttl_elapsed())
254 | j = entry.values.erase(j);
255 | }
256 |
257 | if (entry.values.size()==0) i = entries.erase(i);
258 | }
259 | }
260 | };
261 |
262 | // ----------------------------------------------------------------------------
263 |
264 | /* *****************************************************************************
266 | * ****************************************************************************/
267 |
268 | CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
269 | if( !communicationListeners.contains( service ) ) {
270 | logging_error( "No listener found for service " << service.toString() );
271 | return NULL;
272 | }
273 | CommunicationListener* listener = communicationListeners.get( service );
274 | assert( listener != NULL );
275 | return listener;
276 | }
277 |
278 | // link descriptor handling ----------------------------------------------------
279 |
280 | LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
281 | BOOST_FOREACH( LinkDescriptor* lp, links )
282 | if ((communication ? lp->communicationId : lp->overlayId) == link)
283 | return lp;
284 | return NULL;
285 | }
286 |
287 | const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
288 | BOOST_FOREACH( const LinkDescriptor* lp, links )
289 | if ((communication ? lp->communicationId : lp->overlayId) == link)
290 | return lp;
291 | return NULL;
292 | }
293 |
294 | /// erases a link descriptor
295 | void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
296 | for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
297 | LinkDescriptor* ld = *i;
298 | if ((communication ? ld->communicationId : ld->overlayId) == link) {
299 | delete ld;
300 | links.erase(i);
301 | break;
302 | }
303 | }
304 | }
305 |
306 | /// adds a link descriptor
307 | LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
308 | LinkDescriptor* desc = getDescriptor( link );
309 | if ( desc == NULL ) {
310 | desc = new LinkDescriptor();
311 | if (!link.isUnspecified()) desc->overlayId = link;
312 | links.push_back(desc);
313 | }
314 | return desc;
315 | }
316 |
317 | /// returns a auto-link descriptor
318 | LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
319 | // search for a descriptor that is already up
320 | BOOST_FOREACH( LinkDescriptor* lp, links )
321 | if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
322 | return lp;
323 | // if not found, search for one that is about to come up...
324 | BOOST_FOREACH( LinkDescriptor* lp, links )
325 | if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
326 | return lp;
327 | return NULL;
328 | }
329 |
330 | /// stabilizes link information
331 | void BaseOverlay::stabilizeLinks() {
332 | // send keep-alive messages over established links
333 | BOOST_FOREACH( LinkDescriptor* ld, links ) {
334 | if (!ld->up) continue;
335 | OverlayMsg msg( OverlayMsg::typeLinkAlive,
336 | OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
337 | if (ld->relayed) msg.setRouteRecord(true);
338 | send_link( &msg, ld->overlayId );
339 | }
340 |
341 | // iterate over all links and check for time boundaries
342 | vector<LinkDescriptor*> oldlinks;
343 | time_t now = time(NULL);
344 | BOOST_FOREACH( LinkDescriptor* ld, links ) {
345 |
346 | // keep alives and not up? yes-> link connection request is stale!
347 | if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
348 |
349 | // increase counter
350 | ld->keepAliveMissed++;
351 |
352 | // missed more than four keep-alive messages (10 sec)? -> drop link
353 | if (ld->keepAliveMissed > 4) {
354 | logging_info( "Link connection request is stale, closing: " << ld );
355 | oldlinks.push_back( ld );
356 | continue;
357 | }
358 | }
359 |
360 | if (!ld->up) continue;
361 |
362 | // remote used as relay flag
363 | if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
364 | ld->relaying = false;
365 |
366 | // drop links that are dropped and not used as relay
367 | if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
368 | oldlinks.push_back( ld );
369 | continue;
370 | }
371 |
372 | // auto-link time exceeded?
373 | if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
374 | oldlinks.push_back( ld );
375 | continue;
376 | }
377 |
378 | // keep alives missed? yes->
379 | if ( difftime( now, ld->keepAliveTime ) > 2 ) {
380 |
381 | // increase counter
382 | ld->keepAliveMissed++;
383 |
384 | // missed more than four keep-alive messages (4 sec)? -> drop link
385 | if (ld->keepAliveMissed >= 4) {
386 | logging_info( "Link is stale, closing: " << ld );
387 | oldlinks.push_back( ld );
388 | continue;
389 | }
390 | }
391 | }
392 |
393 | // drop links
394 | BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
395 | logging_info( "Link timed out. Dropping " << ld );
396 | ld->relaying = false;
397 | dropLink( ld->overlayId );
398 | }
399 |
400 | // show link state
401 | counter++;
402 | if (counter>=4) showLinks();
403 | if (counter>=4 || counter<0) counter = 0;
404 | }
405 |
406 |
407 | std::string BaseOverlay::getLinkHTMLInfo() {
408 | std::ostringstream s;
409 | vector<NodeID> nodes;
410 | if (links.size()==0) {
411 | s << "<h2 style=\"color=#606060\">No links established!</h2>";
412 | } else {
413 | s << "<h2 style=\"color=#606060\">Links</h2>";
414 | s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
415 | s << "<tr style=\"background-color=#ffe0e0\">";
416 | s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
417 | s << "</tr>";
418 |
419 | int i=0;
420 | BOOST_FOREACH( LinkDescriptor* ld, links ) {
421 | if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
422 | bool found = false;
423 | BOOST_FOREACH(NodeID& id, nodes)
424 | if (id == ld->remoteNode) found = true;
425 | if (found) continue;
426 | i++;
427 | nodes.push_back(ld->remoteNode);
428 | if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
429 | else s << "<tr>";
430 | s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
431 | s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
432 | s << "<td>";
433 | if (ld->routeRecord.size()>1 && ld->relayed) {
434 | for (size_t i=1; i<ld->routeRecord.size(); i++)
435 | s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
436 | } else {
437 | s << "Direct";
438 | }
439 | s << "</td>";
440 | s << "</tr>";
441 | }
442 | s << "</table>";
443 | }
444 | return s.str();
445 | }
446 |
447 | /// shows the current link state
448 | void BaseOverlay::showLinks() {
449 | int i=0;
450 | logging_info("--- link state -------------------------------");
451 | BOOST_FOREACH( LinkDescriptor* ld, links ) {
452 | logging_info("link " << i << ": " << ld);
453 | i++;
454 | }
455 | logging_info("----------------------------------------------");
456 | }
457 |
458 | /// compares two arbitrary links to the same node
459 | int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
460 | LinkDescriptor* lhsld = getDescriptor(lhs);
461 | LinkDescriptor* rhsld = getDescriptor(rhs);
462 | if (lhsld==NULL || rhsld==NULL
463 | || !lhsld->up || !rhsld->up
464 | || lhsld->remoteNode != rhsld->remoteNode) return -1;
465 |
466 | if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
467 | return -1;
468 |
469 | return 1;
470 | }
471 |
472 |
473 | // internal message delivery ---------------------------------------------------
474 |
475 | /// routes a message to its destination node
476 | void BaseOverlay::route( OverlayMsg* message ) {
477 |
478 | // exceeded time-to-live? yes-> drop message
479 | if (message->getNumHops() > message->getTimeToLive()) {
480 | logging_warn("Message exceeded TTL. Dropping message and relay routes"
481 | "for recovery.");
482 | removeRelayNode(message->getDestinationNode());
483 | return;
484 | }
485 |
486 | // no-> forward message
487 | else {
488 | // destinastion myself? yes-> handle message
489 | if (message->getDestinationNode() == nodeId) {
490 | logging_warn("Usually I should not route messages to myself!");
491 | Message msg;
492 | msg.encapsulate(message);
493 | handleMessage( &msg, NULL );
494 | } else {
495 | // no->send message to next hop
496 | send( message, message->getDestinationNode() );
497 | }
498 | }
499 | }
500 |
501 | /// sends a message to another node, delivers it to the base overlay class
502 | seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
503 | LinkDescriptor* next_link = NULL;
504 |
505 | // drop messages to unspecified destinations
506 | if (destination.isUnspecified()) return -1;
507 |
508 | // send messages to myself -> handle message and drop warning!
509 | if (destination == nodeId) {
510 | logging_warn("Sent message to myself. Handling message.")
511 | Message msg;
512 | msg.encapsulate(message);
513 | handleMessage( &msg, NULL );
514 | return -1;
515 | }
516 |
517 | // use relay path?
518 | if (message->isRelayed()) {
519 | next_link = getRelayLinkTo( destination );
520 | if (next_link != NULL) {
521 | next_link->setRelaying();
522 | return bc->sendMessage(next_link->communicationId, message);
523 | } else {
524 | logging_warn("Could not send message. No relay hop found to "
525 | << destination)
526 | return -1;
527 | }
528 | }
529 |
530 | // routed message
531 | else {
532 | // no-> relay path! route over overlay path
533 | LinkID next_id = overlayInterface->getNextLinkId( destination );
534 | if (next_id.isUnspecified()) {
535 | logging_warn("Could not send message. No next hop found to " <<
536 | destination );
537 | return -1;
538 | }
539 |
540 | // get link descriptor, up and running? yes-> send message
541 | next_link = getDescriptor(next_id);
542 | if (next_link != NULL && next_link->up) {
543 | // send message over relayed link
544 | return send(message, next_link);
545 | }
546 |
547 | // no-> error, dropping message
548 | else {
549 | logging_warn("Could not send message. Link not known or up");
550 | return -1;
551 | }
552 | }
553 |
554 | // not reached-> fail
555 | return -1;
556 | }
557 |
558 | /// send a message using a link descriptor, delivers it to the base overlay class
559 | seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
560 | // check if null
561 | if (ldr == NULL) {
562 | logging_error("Can not send message to " << message->getDestinationAddress());
563 | return -1;
564 | }
565 |
566 | // check if up
567 | if (!ldr->up && !ignore_down) {
568 | logging_error("Can not send message. Link not up:" << ldr );
569 | return -1;
570 | }
571 | LinkDescriptor* ld = NULL;
572 |
573 | // handle relayed link
574 | if (ldr->relayed) {
575 | logging_debug("Resolving direct link for relayed link to "
576 | << ldr->remoteNode);
577 | ld = getRelayLinkTo( ldr->remoteNode );
578 | if (ld==NULL) {
579 | logging_error("No relay path found to link " << ldr );
580 | return -1;
581 | }
582 | ld->setRelaying();
583 | message->setRelayed(true);
584 | } else
585 | ld = ldr;
586 |
587 | // handle direct link
588 | if (ld->communicationUp) {
589 | logging_debug("send(): Sending message over direct link.");
590 | return bc->sendMessage( ld->communicationId, message );
591 | } else {
592 | logging_error("send(): Could not send message. "
593 | "Not a relayed link and direct link is not up.");
594 | return -1;
595 | }
596 | return -1;
597 | }
598 |
599 | seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
600 | const ServiceID& service) {
601 | message->setSourceNode(nodeId);
602 | message->setDestinationNode(remote);
603 | message->setService(service);
604 | send( message, remote );
605 | }
606 |
607 | seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
608 | LinkDescriptor* ld = getDescriptor(link);
609 | if (ld==NULL) {
610 | logging_error("Cannot find descriptor to link id=" << link.toString());
611 | return -1;
612 | }
613 | message->setSourceNode(nodeId);
614 | message->setDestinationNode(ld->remoteNode);
615 |
616 | message->setSourceLink(ld->overlayId);
617 | message->setDestinationLink(ld->remoteLink);
618 |
619 | message->setService(ld->service);
620 | message->setRelayed(ld->relayed);
621 | return send( message, ld, ignore_down );
622 | }
623 |
624 | // relay route management ------------------------------------------------------
625 |
626 | /// stabilize relay information
627 | void BaseOverlay::stabilizeRelays() {
628 | vector<relay_route>::iterator i = relay_routes.begin();
629 | while (i!=relay_routes.end() ) {
630 | relay_route& route = *i;
631 | LinkDescriptor* ld = getDescriptor(route.link);
632 |
633 | // relay link still used and alive?
634 | if (ld==NULL
635 | || !ld->isDirectVital()
636 | || difftime(route.used, time(NULL)) > 8) {
637 | logging_info("Forgetting relay information to node "
638 | << route.node.toString() );
639 | i = relay_routes.erase(i);
640 | } else
641 | i++;
642 | }
643 | }
644 |
645 | void BaseOverlay::removeRelayLink( const LinkID& link ) {
646 | vector<relay_route>::iterator i = relay_routes.begin();
647 | while (i!=relay_routes.end() ) {
648 | relay_route& route = *i;
649 | if (route.link == link ) i = relay_routes.erase(i); else i++;
650 | }
651 | }
652 |
653 | void BaseOverlay::removeRelayNode( const NodeID& remote ) {
654 | vector<relay_route>::iterator i = relay_routes.begin();
655 | while (i!=relay_routes.end() ) {
656 | relay_route& route = *i;
657 | if (route.node == remote ) i = relay_routes.erase(i); else i++;
658 | }
659 | }
660 |
661 | /// refreshes relay information
662 | void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
663 |
664 | // handle relayed messages from real links only
665 | if (ld == NULL
666 | || ld->relayed
667 | || message->getSourceNode()==nodeId ) return;
668 |
669 | // update usage information
670 | if (message->isRelayed()) {
671 | // try to find source node
672 | BOOST_FOREACH( relay_route& route, relay_routes ) {
673 | // relay route found? yes->
674 | if ( route.node == message->getDestinationNode() ) {
675 | ld->setRelaying();
676 | route.used = time(NULL);
677 | }
678 | }
679 |
680 | }
681 |
682 | // register relay path
683 | if (message->isRegisterRelay()) {
684 | // set relaying
685 | ld->setRelaying();
686 |
687 | // try to find source node
688 | BOOST_FOREACH( relay_route& route, relay_routes ) {
689 |
690 | // relay route found? yes->
691 | if ( route.node == message->getSourceNode() ) {
692 |
693 | // refresh timer
694 | route.used = time(NULL);
695 | LinkDescriptor* rld = getDescriptor(route.link);
696 |
697 | // route has a shorter hop count or old link is dead? yes-> replace
698 | if (route.hops > message->getNumHops()
699 | || rld == NULL
700 | || !rld->isDirectVital()) {
701 | logging_info("Updating relay information to node "
702 | << route.node.toString()
703 | << " reducing to " << message->getNumHops() << " hops.");
704 | route.hops = message->getNumHops();
705 | route.link = ld->overlayId;
706 | }
707 | return;
708 | }
709 | }
710 |
711 | // not found-> add new entry
712 | relay_route route;
713 | route.hops = message->getNumHops();
714 | route.link = ld->overlayId;
715 | route.node = message->getSourceNode();
716 | route.used = time(NULL);
717 | logging_info("Remembering relay information to node "
718 | << route.node.toString());
719 | relay_routes.push_back(route);
720 | }
721 | }
722 |
723 | /// returns a known "vital" relay link which is up and running
724 | LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
725 | // try to find source node
726 | BOOST_FOREACH( relay_route& route, relay_routes ) {
727 | if (route.node == remote ) {
728 | LinkDescriptor* ld = getDescriptor( route.link );
729 | if (ld==NULL || !ld->isDirectVital()) return NULL; else {
730 | route.used = time(NULL);
731 | return ld;
732 | }
733 | }
734 | }
735 | return NULL;
736 | }
737 |
738 | /* *****************************************************************************
740 | * ****************************************************************************/
741 |
742 | use_logging_cpp(BaseOverlay);
743 |
744 | // ----------------------------------------------------------------------------
745 |
746 | BaseOverlay::BaseOverlay() :
747 | bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
748 | spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
749 | sideport(&SideportListener::DEFAULT), started(false), counter(0) {
750 | dht = new DHT();
751 | localDHT = new DHT();
752 | }
753 |
754 | BaseOverlay::~BaseOverlay() {
755 | delete dht;
756 | }
757 |
758 | // ----------------------------------------------------------------------------
759 |
760 | void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
761 | logging_info("Starting...");
762 |
763 | // set parameters
764 | bc = &_basecomm;
765 | nodeId = _nodeid;
766 |
767 | // register at base communication
768 | bc->registerMessageReceiver( this );
769 | bc->registerEventListener( this );
770 |
771 | // timer for auto link management
772 | Timer::setInterval( 1000 );
773 | Timer::start();
774 |
775 | started = true;
776 | state = BaseOverlayStateInvalid;
777 | }
778 |
779 | void BaseOverlay::stop() {
780 | logging_info("Stopping...");
781 |
782 | // stop timer
783 | Timer::stop();
784 |
785 | // delete oberlay interface
786 | if(overlayInterface != NULL) {
787 | delete overlayInterface;
788 | overlayInterface = NULL;
789 | }
790 |
791 | // unregister at base communication
792 | bc->unregisterMessageReceiver( this );
793 | bc->unregisterEventListener( this );
794 |
795 | started = false;
796 | state = BaseOverlayStateInvalid;
797 | }
798 |
799 | bool BaseOverlay::isStarted(){
800 | return started;
801 | }
802 |
803 | // ----------------------------------------------------------------------------
804 |
805 | void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
806 | const EndpointDescriptor& bootstrapEp) {
807 |
808 | if(id != spovnetId){
809 | logging_error("attempt to join against invalid spovnet, call initiate first");
810 | return;
811 | }
812 |
813 |
814 | //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
815 | logging_info( "Starting to join spovnet " << id.toString() <<
816 | " with nodeid " << nodeId.toString());
817 |
818 | if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
819 |
820 | // bootstrap against ourselfs
821 | logging_info("joining spovnet locally");
822 |
823 | overlayInterface->joinOverlay();
824 | state = BaseOverlayStateCompleted;
825 | BOOST_FOREACH( NodeListener* i, nodeListeners )
826 | i->onJoinCompleted( spovnetId );
827 |
828 | //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
829 | //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
830 |
831 | logging_debug("starting overlay bootstrap module");
832 | overlayBootstrap.start(this, spovnetId, nodeId);
833 | overlayBootstrap.publish(bc->getEndpointDescriptor());
834 |
835 | } else {
836 |
837 | // bootstrap against another node
838 | logging_info("joining spovnet remotely against " << bootstrapEp.toString());
839 |
840 | const LinkID& lnk = bc->establishLink( bootstrapEp );
841 | bootstrapLinks.push_back(lnk);
842 | logging_info("join process initiated for " << id.toString() << "...");
843 | }
844 | }
845 |
846 | void BaseOverlay::leaveSpoVNet() {
847 |
848 | logging_info( "Leaving spovnet " << spovnetId );
849 | bool ret = ( state != this->BaseOverlayStateInvalid );
850 |
851 | logging_debug("stopping overlay bootstrap module");
852 | overlayBootstrap.stop();
853 | overlayBootstrap.revoke();
854 |
855 | logging_debug( "Dropping all auto-links" );
856 |
857 | // gather all service links
858 | vector<LinkID> servicelinks;
859 | BOOST_FOREACH( LinkDescriptor* ld, links ) {
860 | if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
861 | servicelinks.push_back( ld->overlayId );
862 | }
863 |
864 | // drop all service links
865 | BOOST_FOREACH( LinkID lnk, servicelinks )
866 | dropLink( lnk );
867 |
868 | // let the node leave the spovnet overlay interface
869 | logging_debug( "Leaving overlay" );
870 | if( overlayInterface != NULL )
871 | overlayInterface->leaveOverlay();
872 |
873 | // drop still open bootstrap links
874 | BOOST_FOREACH( LinkID lnk, bootstrapLinks )
875 | bc->dropLink( lnk );
876 |
877 | // change to inalid state
878 | state = BaseOverlayStateInvalid;
879 | //ovl.visShutdown( ovlId, nodeId, string("") );
880 |
881 | visual.visShutdown(visualIdOverlay, nodeId, "");
882 | visual.visShutdown(visualIdBase, nodeId, "");
883 |
884 | // inform all registered services of the event
885 | BOOST_FOREACH( NodeListener* i, nodeListeners ) {
886 | if( ret ) i->onLeaveCompleted( spovnetId );
887 | else i->onLeaveFailed( spovnetId );
888 | }
889 | }
890 |
891 | void BaseOverlay::createSpoVNet(const SpoVNetID& id,
892 | const OverlayParameterSet& param,
893 | const SecurityParameterSet& sec,
894 | const QoSParameterSet& qos) {
895 |
896 | // set the state that we are an initiator, this way incoming messages are
897 | // handled correctly
898 | logging_info( "creating spovnet " + id.toString() <<
899 | " with nodeid " << nodeId.toString() );
900 |
901 | spovnetId = id;
902 |
903 | overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
904 | if( overlayInterface == NULL ) {
905 | logging_fatal( "overlay structure not supported" );
906 | state = BaseOverlayStateInvalid;
907 |
908 | BOOST_FOREACH( NodeListener* i, nodeListeners )
909 | i->onJoinFailed( spovnetId );
910 |
911 | return;
912 | }
913 |
914 | //visual.configure("", 50005);
915 | //visual.configure("", 50005);
916 | visual.visCreate(visualIdBase, nodeId, "", "");
917 | visual.visCreate(visualIdOverlay, nodeId, "", "");
918 | }
919 |
920 | // ----------------------------------------------------------------------------
921 |
922 | const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
923 | const NodeID& remoteId, const ServiceID& service ) {
924 |
925 | // establish link via overlay
926 | if (!remoteId.isUnspecified())
927 | return establishLink( remoteId, service );
928 | else
929 |
930 | // establish link directly if only ep is known
931 | if (remoteId.isUnspecified())
932 | return establishDirectLink(remoteEp, service );
933 |
934 | }
935 |
936 | /// call base communication's establish link and add link mapping
937 | const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
938 | const ServiceID& service ) {
939 |
940 | /// find a service listener
941 | if( !communicationListeners.contains( service ) ) {
942 | logging_error( "No listener registered for service id=" << service.toString() );
943 | return LinkID::UNSPECIFIED;
944 | }
945 | CommunicationListener* listener = communicationListeners.get( service );
946 | assert( listener != NULL );
947 |
948 | // create descriptor
949 | LinkDescriptor* ld = addDescriptor();
950 | ld->relayed = false;
951 | ld->listener = listener;
952 | ld->service = service;
953 | ld->communicationId = bc->establishLink( ep );
954 |
955 | /// establish link and add mapping
956 | logging_info("Establishing direct link " << ld->communicationId.toString()
957 | << " using " << ep.toString());
958 |
959 | return ld->communicationId;
960 | }
961 |
962 | /// establishes a link between two arbitrary nodes
963 | const LinkID BaseOverlay::establishLink( const NodeID& remote,
964 | const ServiceID& service ) {
965 |
966 | // do not establish a link to myself!
967 | if (remote == nodeId) return LinkID::UNSPECIFIED;
968 |
969 | // create a link descriptor
970 | LinkDescriptor* ld = addDescriptor();
971 | ld->relayed = true;
972 | ld->remoteNode = remote;
973 | ld->service = service;
974 | ld->listener = getListener(ld->service);
975 |
976 | // create link request message
977 | OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
978 | msg.setSourceLink(ld->overlayId);
979 | msg.setRelayed(false);
980 | msg.setRegisterRelay(true);
981 | // msg.setRelayed(true);
982 |
983 | // debug message
984 | logging_info(
985 | "Sending link request with"
986 | << " link=" << ld->overlayId.toString()
987 | << " node=" << ld->remoteNode.toString()
988 | << " serv=" << ld->service.toString()
989 | );
990 |
991 | // sending message to node
992 | send_node( &msg, ld->remoteNode, ld->service );
993 |
994 | return ld->overlayId;
995 | }
996 |
997 | /// drops an established link
998 | void BaseOverlay::dropLink(const LinkID& link) {
999 | logging_info( "Dropping link (initiated locally):" << link.toString() );
1000 |
1001 | // find the link item to drop
1002 | LinkDescriptor* ld = getDescriptor(link);
1003 | if( ld == NULL ) {
1004 | logging_warn( "Can't drop link, link is unknown!");
1005 | return;
1006 | }
1007 |
1008 | // delete all queued messages
1009 | if( ld->messageQueue.size() > 0 ) {
1010 | logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
1011 | << ld->messageQueue.size() << " waiting messages" );
1012 | ld->flushQueue();
1013 | }
1014 |
1015 | // inform sideport and listener
1016 | if(ld->listener != NULL)
1017 | ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
1018 | sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
1019 |
1020 | // do not drop relay links
1021 | if (!ld->relaying) {
1022 | // drop the link in base communication
1023 | if (ld->communicationUp) bc->dropLink( ld->communicationId );
1024 |
1025 | // erase descriptor
1026 | eraseDescriptor( ld->overlayId );
1027 | } else {
1028 | ld->dropAfterRelaying = true;
1029 | }
1030 | }
1031 |
1032 | // ----------------------------------------------------------------------------
1033 |
1034 | /// internal send message, always use this functions to send messages over links
1035 | seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
1036 | logging_debug( "Sending data message on link " << link.toString() );
1037 |
1038 | // get the mapping for this link
1039 | LinkDescriptor* ld = getDescriptor(link);
1040 | if( ld == NULL ) {
1041 | logging_error("Could not send message. "
1042 | << "Link not found id=" << link.toString());
1043 | return -1;
1044 | }
1045 |
1046 | // check if the link is up yet, if its an auto link queue message
1047 | if( !ld->up ) {
1048 | ld->setAutoUsed();
1049 | if( ld->autolink ) {
1050 | logging_info("Auto-link " << link.toString() << " not up, queue message");
1051 | Data data = data_serialize( message );
1052 | const_cast<Message*>(message)->dropPayload();
1053 | ld->messageQueue.push_back( new Message(data) );
1054 | } else {
1055 | logging_error("Link " << link.toString() << " not up, drop message");
1056 | }
1057 | return -1;
1058 | }
1059 |
1060 | // compile overlay message (has service and node id)
1061 | OverlayMsg overmsg( OverlayMsg::typeData );
1062 | overmsg.encapsulate( const_cast<Message*>(message) );
1063 |
1064 | // send message over relay/direct/overlay
1065 | return send_link( &overmsg, ld->overlayId );
1066 | }
1067 |
1068 | seqnum_t BaseOverlay::sendMessage(const Message* message,
1069 | const NodeID& node, const ServiceID& service) {
1070 |
1071 | // find link for node and service
1072 | LinkDescriptor* ld = getAutoDescriptor( node, service );
1073 |
1074 | // if we found no link, create an auto link
1075 | if( ld == NULL ) {
1076 |
1077 | // debug output
1078 | logging_info( "No link to send message to node "
1079 | << node.toString() << " found for service "
1080 | << service.toString() << ". Creating auto link ..."
1081 | );
1082 |
1083 | // call base overlay to create a link
1084 | LinkID link = establishLink( node, service );
1085 | ld = getDescriptor( link );
1086 | if( ld == NULL ) {
1087 | logging_error( "Failed to establish auto-link.");
1088 | return -1;
1089 | }
1090 | ld->autolink = true;
1091 |
1092 | logging_debug( "Auto-link establishment in progress to node "
1093 | << node.toString() << " with link id=" << link.toString() );
1094 | }
1095 | assert(ld != NULL);
1096 |
1097 | // mark the link as used, as we now send a message through it
1098 | ld->setAutoUsed();
1099 |
1100 | // send / queue message
1101 | return sendMessage( message, ld->overlayId );
1102 | }
1103 |
1104 | // ----------------------------------------------------------------------------
1105 |
1106 | const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1107 | const LinkID& link) const {
1108 |
1109 | // return own end-point descriptor
1110 | if( link == LinkID::UNSPECIFIED )
1111 | return bc->getEndpointDescriptor();
1112 |
1113 | // find link descriptor. not found -> return unspecified
1114 | const LinkDescriptor* ld = getDescriptor(link);
1115 | if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
1116 |
1117 | // return endpoint-descriptor from base communication
1118 | return bc->getEndpointDescriptor( ld->communicationId );
1119 | }
1120 |
1121 | const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1122 | const NodeID& node) const {
1123 |
1124 | // return own end-point descriptor
1125 | if( node == nodeId || node == NodeID::UNSPECIFIED )
1126 | return bc->getEndpointDescriptor();
1127 |
1128 | // no joined and request remote descriptor? -> fail!
1129 | if( overlayInterface == NULL ) {
1130 | logging_error( "overlay interface not set, cannot resolve endpoint" );
1131 | return EndpointDescriptor::UNSPECIFIED();
1132 | }
1133 |
1134 | // resolve end-point descriptor from the base-overlay routing table
1135 | const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
1136 | if(ep != EndpointDescriptor::UNSPECIFIED()) return ep;
1137 |
1138 | // see if we can find the node in our own table
1139 | BOOST_FOREACH(const LinkDescriptor* ld, links){
1140 | if(ld->remoteNode != node) continue;
1141 | const EndpointDescriptor& ep = bc->getEndpointDescriptor(ld->communicationId);
1142 | if(ep.toString().size()==0) continue;
1143 | if(ep != EndpointDescriptor::UNSPECIFIED()) return ep;
1144 | }
1145 |
1146 | return EndpointDescriptor::UNSPECIFIED();
1147 | }
1148 |
1149 | // ----------------------------------------------------------------------------
1150 |
1151 | bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1152 | sideport = _sideport;
1153 | _sideport->configure( this );
1154 | }
1155 |
1156 | bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1157 | sideport = &SideportListener::DEFAULT;
1158 | }
1159 |
1160 | // ----------------------------------------------------------------------------
1161 |
1162 | bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1163 | logging_debug( "binding communication listener " << listener
1164 | << " on serviceid " << sid.toString() );
1165 |
1166 | if( communicationListeners.contains( sid ) ) {
1167 | logging_error( "some listener already registered for service id "
1168 | << sid.toString() );
1169 | return false;
1170 | }
1171 |
1172 | communicationListeners.registerItem( listener, sid );
1173 | return true;
1174 | }
1175 |
1176 |
1177 | bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1178 | logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1179 |
1180 | if( !communicationListeners.contains( sid ) ) {
1181 | logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1182 | return false;
1183 | }
1184 |
1185 | if( communicationListeners.get(sid) != listener ) {
1186 | logging_warn( "listener bound to service id " << sid.toString()
1187 | << " is different than listener trying to unbind" );
1188 | return false;
1189 | }
1190 |
1191 | communicationListeners.unregisterItem( sid );
1192 | return true;
1193 | }
1194 |
1195 | // ----------------------------------------------------------------------------
1196 |
1197 | bool BaseOverlay::bind(NodeListener* listener) {
1198 | logging_debug( "Binding node listener " << listener );
1199 |
1200 | // already bound? yes-> warning
1201 | NodeListenerVector::iterator i =
1202 | find( nodeListeners.begin(), nodeListeners.end(), listener );
1203 | if( i != nodeListeners.end() ) {
1204 | logging_warn("Node listener " << listener << " is already bound!" );
1205 | return false;
1206 | }
1207 |
1208 | // no-> add
1209 | nodeListeners.push_back( listener );
1210 | return true;
1211 | }
1212 |
1213 | bool BaseOverlay::unbind(NodeListener* listener) {
1214 | logging_debug( "Unbinding node listener " << listener );
1215 |
1216 | // already unbound? yes-> warning
1217 | NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1218 | if( i == nodeListeners.end() ) {
1219 | logging_warn( "Node listener " << listener << " is not bound!" );
1220 | return false;
1221 | }
1222 |
1223 | // no-> remove
1224 | nodeListeners.erase( i );
1225 | return true;
1226 | }
1227 |
1228 | // ----------------------------------------------------------------------------
1229 |
1230 | void BaseOverlay::onLinkUp(const LinkID& id,
1231 | const address_v* local, const address_v* remote) {
1232 | logging_debug( "Link up with base communication link id=" << id );
1233 |
1234 | // get descriptor for link
1235 | LinkDescriptor* ld = getDescriptor(id, true);
1236 |
1237 | // handle bootstrap link we initiated
1238 | if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1239 | logging_info(
1240 | "Join has been initiated by me and the link is now up. " <<
1241 | "Sending out join request for SpoVNet " << spovnetId.toString()
1242 | );
1243 |
1244 | // send join request message
1245 | OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1246 | OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1247 | JoinRequest joinRequest( spovnetId, nodeId );
1248 | overlayMsg.encapsulate( &joinRequest );
1249 | bc->sendMessage( id, &overlayMsg );
1250 | return;
1251 | }
1252 |
1253 | // no link found? -> link establishment from remote, add one!
1254 | if (ld == NULL) {
1255 | ld = addDescriptor( id );
1256 | logging_info( "onLinkUp (remote request) descriptor: " << ld );
1257 |
1258 | // update descriptor
1259 | ld->fromRemote = true;
1260 | ld->communicationId = id;
1261 | ld->communicationUp = true;
1262 | ld->setAutoUsed();
1263 | ld->setAlive();
1264 |
1265 | // in this case, do not inform listener, since service it unknown
1266 | // -> wait for update message!
1267 |
1268 | // link mapping found? -> send update message with node-id and service id
1269 | } else {
1270 | logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1271 |
1272 | // update descriptor
1273 | ld->setAutoUsed();
1274 | ld->setAlive();
1275 | ld->communicationUp = true;
1276 | ld->fromRemote = false;
1277 |
1278 | // if link is a relayed link->convert to direct link
1279 | if (ld->relayed) {
1280 | logging_info( "Converting to direct link: " << ld );
1281 | ld->up = true;
1282 | ld->relayed = false;
1283 | OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1284 | overMsg.setSourceLink( ld->overlayId );
1285 | overMsg.setDestinationLink( ld->remoteLink );
1286 | send_link( &overMsg, ld->overlayId );
1287 | } else {
1288 | // note: necessary to validate the link on the remote side!
1289 | logging_info( "Sending out update" <<
1290 | " for service " << ld->service.toString() <<
1291 | " with local node id " << nodeId.toString() <<
1292 | " on link " << ld->overlayId.toString() );
1293 |
1294 | // compile and send update message
1295 | OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1296 | overlayMsg.setSourceLink(ld->overlayId);
1297 | overlayMsg.setAutoLink( ld->autolink );
1298 | send_link( &overlayMsg, ld->overlayId, true );
1299 | }
1300 | }
1301 | }
1302 |
1303 | void BaseOverlay::onLinkDown(const LinkID& id,
1304 | const address_v* local, const address_v* remote) {
1305 |
1306 | // erase bootstrap links
1307 | vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1308 | if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1309 |
1310 | // get descriptor for link
1311 | LinkDescriptor* ld = getDescriptor(id, true);
1312 | if ( ld == NULL ) return; // not found? ->ignore!
1313 | logging_info( "onLinkDown descriptor: " << ld );
1314 |
1315 | // removing relay link information
1316 | removeRelayLink(ld->overlayId);
1317 |
1318 | // inform listeners about link down
1319 | ld->communicationUp = false;
1320 | if (!ld->service.isUnspecified()) {
1321 | CommunicationListener* lst = getListener(ld->service);
1322 | if(lst != NULL) lst->onLinkDown( ld->overlayId, ld->remoteNode );
1323 | sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1324 | }
1325 |
1326 | // delete all queued messages (auto links)
1327 | if( ld->messageQueue.size() > 0 ) {
1328 | logging_warn( "Dropping link " << id.toString() << " that has "
1329 | << ld->messageQueue.size() << " waiting messages" );
1330 | ld->flushQueue();
1331 | }
1332 |
1333 | // erase mapping
1334 | eraseDescriptor(ld->overlayId);
1335 | }
1336 |
1337 | void BaseOverlay::onLinkChanged(const LinkID& id,
1338 | const address_v* oldlocal, const address_v* newlocal,
1339 | const address_v* oldremote, const address_v* newremote) {
1340 |
1341 | // get descriptor for link
1342 | LinkDescriptor* ld = getDescriptor(id, true);
1343 | if ( ld == NULL ) return; // not found? ->ignore!
1344 | logging_debug( "onLinkChanged descriptor: " << ld );
1345 |
1346 | // inform listeners
1347 | ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1348 | sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1349 |
1350 | // autolinks: refresh timestamp
1351 | ld->setAutoUsed();
1352 | }
1353 |
1354 | void BaseOverlay::onLinkFail(const LinkID& id,
1355 | const address_v* local, const address_v* remote) {
1356 | logging_debug( "Link fail with base communication link id=" << id );
1357 |
1358 | // erase bootstrap links
1359 | vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1360 | if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1361 |
1362 | // get descriptor for link
1363 | LinkDescriptor* ld = getDescriptor(id, true);
1364 | if ( ld == NULL ) return; // not found? ->ignore!
1365 | logging_debug( "Link failed id=" << ld->overlayId.toString() );
1366 |
1367 | // inform listeners
1368 | ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1369 | sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1370 | }
1371 |
1372 | void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1373 | const address_v* remote, const QoSParameterSet& qos) {
1374 | logging_debug( "Link quality changed with base communication link id=" << id );
1375 |
1376 | // get descriptor for link
1377 | LinkDescriptor* ld = getDescriptor(id, true);
1378 | if ( ld == NULL ) return; // not found? ->ignore!
1379 | logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1380 | }
1381 |
1382 | bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1383 | const address_v* remote ) {
1384 | logging_debug("Accepting link request from " << remote->to_string() );
1385 | return true;
1386 | }
1387 |
1388 | /// handles a message from base communication
1389 | bool BaseOverlay::receiveMessage(const Message* message,
1390 | const LinkID& link, const NodeID& ) {
1391 | // get descriptor for link
1392 | LinkDescriptor* ld = getDescriptor( link, true );
1393 | return handleMessage( message, ld, link );
1394 | }
1395 |
1396 | // ----------------------------------------------------------------------------
1397 |
1398 | /// Handle spovnet instance join requests
1399 | bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1400 |
1401 | // decapsulate message
1402 | JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1403 | logging_info( "Received join request for spovnet " <<
1404 | joinReq->getSpoVNetID().toString() );
1405 |
1406 | // check spovnet id
1407 | if( joinReq->getSpoVNetID() != spovnetId ) {
1408 | logging_error(
1409 | "Received join request for spovnet we don't handle " <<
1410 | joinReq->getSpoVNetID().toString() );
1411 | return false;
1412 | }
1413 |
1414 | // TODO: here you can implement mechanisms to deny joining of a node
1415 | bool allow = true;
1416 | logging_info( "Sending join reply for spovnet " <<
1417 | spovnetId.toString() << " to node " <<
1418 | overlayMsg->getSourceNode().toString() <<
1419 | ". Result: " << (allow ? "allowed" : "denied") );
1420 | joiningNodes.push_back( overlayMsg->getSourceNode() );
1421 |
1422 | // return overlay parameters
1423 | assert( overlayInterface != NULL );
1424 | logging_debug( "Using bootstrap end-point "
1425 | << getEndpointDescriptor().toString() )
1426 | OverlayParameterSet parameters = overlayInterface->getParameters();
1427 | OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1428 | OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1429 | JoinReply replyMsg( spovnetId, parameters,
1430 | allow, getEndpointDescriptor() );
1431 | retmsg.encapsulate(&replyMsg);
1432 | bc->sendMessage( bcLink, &retmsg );
1433 |
1434 | return true;
1435 | }
1436 |
1437 | /// Handle replies to spovnet instance join requests
1438 | bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1439 | // decapsulate message
1440 | logging_debug("received join reply message");
1441 | JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1442 |
1443 | // correct spovnet?
1444 | if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1445 | logging_error( "Received SpoVNet join reply for " <<
1446 | replyMsg->getSpoVNetID().toString() <<
1447 | " != " << spovnetId.toString() );
1448 | delete replyMsg;
1449 | return false;
1450 | }
1451 |
1452 | // access granted? no -> fail
1453 | if( !replyMsg->getJoinAllowed() ) {
1454 | logging_error( "Our join request has been denied" );
1455 |
1456 | // drop initiator link
1457 | if( !bcLink.isUnspecified() ){
1458 | bc->dropLink( bcLink );
1459 |
1460 | vector<LinkID>::iterator it = std::find(
1461 | bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1462 | if( it != bootstrapLinks.end() )
1463 | bootstrapLinks.erase(it);
1464 | }
1465 |
1466 | // inform all registered services of the event
1467 | BOOST_FOREACH( NodeListener* i, nodeListeners )
1468 | i->onJoinFailed( spovnetId );
1469 |
1470 | delete replyMsg;
1471 | return true;
1472 | }
1473 |
1474 | // access has been granted -> continue!
1475 | logging_info("Join request has been accepted for spovnet " <<
1476 | spovnetId.toString() );
1477 |
1478 | logging_debug( "Using bootstrap end-point "
1479 | << replyMsg->getBootstrapEndpoint().toString() );
1480 |
1481 | // create overlay structure from spovnet parameter set
1482 | // if we have not boostrapped yet against some other node
1483 | if( overlayInterface == NULL ){
1484 |
1485 | logging_debug("first-time bootstrapping");
1486 |
1487 | overlayInterface = OverlayFactory::create(
1488 | *this, replyMsg->getParam(), nodeId, this );
1489 |
1490 | // overlay structure supported? no-> fail!
1491 | if( overlayInterface == NULL ) {
1492 | logging_error( "overlay structure not supported" );
1493 |
1494 | if( !bcLink.isUnspecified() ){
1495 | bc->dropLink( bcLink );
1496 |
1497 | vector<LinkID>::iterator it = std::find(
1498 | bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1499 | if( it != bootstrapLinks.end() )
1500 | bootstrapLinks.erase(it);
1501 | }
1502 |
1503 | // inform all registered services of the event
1504 | BOOST_FOREACH( NodeListener* i, nodeListeners )
1505 | i->onJoinFailed( spovnetId );
1506 |
1507 | delete replyMsg;
1508 | return true;
1509 | }
1510 |
1511 | // everything ok-> join the overlay!
1512 | state = BaseOverlayStateCompleted;
1513 | overlayInterface->createOverlay();
1514 |
1515 | overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1516 | overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1517 |
1518 | // update ovlvis
1519 | //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1520 |
1521 | // inform all registered services of the event
1522 | BOOST_FOREACH( NodeListener* i, nodeListeners )
1523 | i->onJoinCompleted( spovnetId );
1524 |
1525 | delete replyMsg;
1526 |
1527 | } else {
1528 |
1529 | // this is not the first bootstrap, just join the additional node
1530 | logging_debug("not first-time bootstrapping");
1531 | overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1532 | overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1533 |
1534 | delete replyMsg;
1535 |
1536 | } // if( overlayInterface == NULL )
1537 |
1538 | return true;
1539 | }
1540 |
1541 |
1542 | bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1543 | // get service
1544 | const ServiceID& service = overlayMsg->getService();
1545 | logging_debug( "Received data for service " << service.toString()
1546 | << " on link " << overlayMsg->getDestinationLink().toString() );
1547 |
1548 | // delegate data message
1549 | CommunicationListener* lst = getListener(service);
1550 | if(lst != NULL){
1551 | lst->onMessage(
1552 | overlayMsg,
1553 | overlayMsg->getSourceNode(),
1554 | overlayMsg->getDestinationLink()
1555 | );
1556 | }
1557 |
1558 | return true;
1559 | }
1560 |
1561 |
1562 | bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1563 |
1564 | if( ld == NULL ) {
1565 | logging_warn( "received overlay update message for link for "
1566 | << "which we have no mapping" );
1567 | return false;
1568 | }
1569 | logging_info("Received type update message on link " << ld );
1570 |
1571 | // update our link mapping information for this link
1572 | bool changed =
1573 | ( ld->remoteNode != overlayMsg->getSourceNode() )
1574 | || ( ld->service != overlayMsg->getService() );
1575 |
1576 | // set parameters
1577 | ld->up = true;
1578 | ld->remoteNode = overlayMsg->getSourceNode();
1579 | ld->remoteLink = overlayMsg->getSourceLink();
1580 | ld->service = overlayMsg->getService();
1581 | ld->autolink = overlayMsg->isAutoLink();
1582 |
1583 | // if our link information changed, we send out an update, too
1584 | if( changed ) {
1585 | overlayMsg->swapRoles();
1586 | overlayMsg->setSourceNode(nodeId);
1587 | overlayMsg->setSourceLink(ld->overlayId);
1588 | overlayMsg->setService(ld->service);
1589 | send( overlayMsg, ld );
1590 | }
1591 |
1592 | // service registered? no-> error!
1593 | if( !communicationListeners.contains( ld->service ) ) {
1594 | logging_warn( "Link up: event listener has not been registered" );
1595 | return false;
1596 | }
1597 |
1598 | // default or no service registered?
1599 | CommunicationListener* listener = communicationListeners.get( ld->service );
1600 | if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1601 | logging_warn("Link up: event listener is default or null!" );
1602 | return true;
1603 | }
1604 |
1605 | // update descriptor
1606 | ld->listener = listener;
1607 | ld->setAutoUsed();
1608 | ld->setAlive();
1609 |
1610 | // ask the service whether it wants to accept this link
1611 | if( !listener->onLinkRequest(ld->remoteNode) ) {
1612 |
1613 | logging_debug("Link id=" << ld->overlayId.toString() <<
1614 | " has been denied by service " << ld->service.toString() << ", dropping link");
1615 |
1616 | // prevent onLinkDown calls to the service
1617 | ld->listener = &CommunicationListener::DEFAULT;
1618 |
1619 | // drop the link
1620 | dropLink( ld->overlayId );
1621 | return true;
1622 | }
1623 |
1624 | // set link up
1625 | ld->up = true;
1626 | logging_info( "Link has been accepted by service and is up: " << ld );
1627 |
1628 | // auto links: link has been accepted -> send queued messages
1629 | if( ld->messageQueue.size() > 0 ) {
1630 | logging_info( "Sending out queued messages on link " << ld );
1631 | BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1632 | sendMessage( msg, ld->overlayId );
1633 | delete msg;
1634 | }
1635 | ld->messageQueue.clear();
1636 | }
1637 |
1638 | // call the notification functions
1639 | listener->onLinkUp( ld->overlayId, ld->remoteNode );
1640 | sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1641 |
1642 | return true;
1643 | }
1644 |
1645 | /// handle a link request and reply
1646 | bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1647 | logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1648 |
1649 | //TODO: Check if a request has already been sent using getSourceLink() ...
1650 |
1651 | // create link descriptor
1652 | LinkDescriptor* ldn = addDescriptor();
1653 |
1654 | // flags
1655 | ldn->up = true;
1656 | ldn->fromRemote = true;
1657 | ldn->relayed = true;
1658 |
1659 | // parameters
1660 | ldn->service = overlayMsg->getService();
1661 | ldn->listener = getListener(ldn->service);
1662 | ldn->remoteNode = overlayMsg->getSourceNode();
1663 | ldn->remoteLink = overlayMsg->getSourceLink();
1664 |
1665 | // update time-stamps
1666 | ldn->setAlive();
1667 | ldn->setAutoUsed();
1668 |
1669 | // create reply message and send back!
1670 | overlayMsg->swapRoles(); // swap source/destination
1671 | overlayMsg->setType(OverlayMsg::typeLinkReply);
1672 | overlayMsg->setSourceLink(ldn->overlayId);
1673 | overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1674 | overlayMsg->setRelayed(true);
1675 | send( overlayMsg, ld ); // send back to link
1676 |
1677 | // inform listener
1678 | if(ldn != NULL && ldn->listener != NULL)
1679 | ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1680 |
1681 | return true;
1682 | }
1683 |
1684 | bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1685 |
1686 | // find link request
1687 | LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1688 |
1689 | // not found? yes-> drop with error!
1690 | if (ldn == NULL) {
1691 | logging_error( "No link request pending for "
1692 | << overlayMsg->getDestinationLink().toString() );
1693 | return false;
1694 | }
1695 | logging_debug("Handling link reply for " << ldn )
1696 |
1697 | // check if already up
1698 | if (ldn->up) {
1699 | logging_warn( "Link already up: " << ldn );
1700 | return true;
1701 | }
1702 |
1703 | // debug message
1704 | logging_debug( "Link request reply received. Establishing link"
1705 | << " for service " << overlayMsg->getService().toString()
1706 | << " with local id=" << overlayMsg->getDestinationLink()
1707 | << " and remote link id=" << overlayMsg->getSourceLink()
1708 | << " to " << overlayMsg->getSourceEndpoint().toString()
1709 | );
1710 |
1711 | // set local link descriptor data
1712 | ldn->up = true;
1713 | ldn->relayed = true;
1714 | ldn->service = overlayMsg->getService();
1715 | ldn->listener = getListener(ldn->service);
1716 | ldn->remoteLink = overlayMsg->getSourceLink();
1717 | ldn->remoteNode = overlayMsg->getSourceNode();
1718 |
1719 | // update timestamps
1720 | ldn->setAlive();
1721 | ldn->setAutoUsed();
1722 |
1723 | // auto links: link has been accepted -> send queued messages
1724 | if( ldn->messageQueue.size() > 0 ) {
1725 | logging_info( "Sending out queued messages on link " <<
1726 | ldn->overlayId.toString() );
1727 | BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1728 | sendMessage( msg, ldn->overlayId );
1729 | delete msg;
1730 | }
1731 | ldn->messageQueue.clear();
1732 | }
1733 |
1734 | // inform listeners about new link
1735 | ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1736 |
1737 | // try to replace relay link with direct link
1738 | ldn->communicationId =
1739 | bc->establishLink( overlayMsg->getSourceEndpoint() );
1740 |
1741 | return true;
1742 | }
1743 |
1744 | /// handle a keep-alive message for a link
1745 | bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1746 | LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1747 | if ( rld != NULL ) {
1748 | logging_debug("Keep-Alive for " <<
1749 | overlayMsg->getDestinationLink() );
1750 | if (overlayMsg->isRouteRecord())
1751 | rld->routeRecord = overlayMsg->getRouteRecord();
1752 | rld->setAlive();
1753 | return true;
1754 | } else {
1755 | logging_error("Keep-Alive for "
1756 | << overlayMsg->getDestinationLink() << ": link unknown." );
1757 | return false;
1758 | }
1759 | }
1760 |
1761 | /// handle a direct link message
1762 | bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1763 | logging_debug( "Received direct link replacement request" );
1764 |
1765 | /// get destination overlay link
1766 | LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1767 | if (rld == NULL || ld == NULL) {
1768 | logging_error("Direct link replacement: Link "
1769 | << overlayMsg->getDestinationLink() << "not found error." );
1770 | return false;
1771 | }
1772 | logging_info( "Received direct link convert notification for " << rld );
1773 |
1774 | // update information
1775 | rld->communicationId = ld->communicationId;
1776 | rld->communicationUp = true;
1777 | rld->relayed = false;
1778 |
1779 | // mark used and alive!
1780 | rld->setAlive();
1781 | rld->setAutoUsed();
1782 |
1783 | // erase the original descriptor
1784 | eraseDescriptor(ld->overlayId);
1785 | }
1786 |
1787 | /// handles an incoming message
1788 | bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1789 | const LinkID bcLink ) {
1790 | logging_debug( "Handling message: " << message->toString());
1791 |
1792 | // decapsulate overlay message
1793 | OverlayMsg* overlayMsg =
1794 | const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1795 | if( overlayMsg == NULL ) return false;
1796 |
1797 | // increase number of hops
1798 | overlayMsg->increaseNumHops();
1799 |
1800 | // refresh relay information
1801 | refreshRelayInformation( overlayMsg, ld );
1802 |
1803 | // update route record
1804 | overlayMsg->addRouteRecord(nodeId);
1805 |
1806 | // handle dht messages (do not route)
1807 | if (overlayMsg->isDHTMessage())
1808 | return handleDHTMessage(overlayMsg);
1809 |
1810 | // handle signaling messages (do not route!)
1811 | if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1812 | overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1813 | overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1814 | delete overlayMsg;
1815 | return true;
1816 | }
1817 |
1818 | // message for reached destination? no-> route message
1819 | if (!overlayMsg->getDestinationNode().isUnspecified() &&
1820 | overlayMsg->getDestinationNode() != nodeId ) {
1821 | logging_debug("Routing message "
1822 | << " from " << overlayMsg->getSourceNode()
1823 | << " to " << overlayMsg->getDestinationNode()
1824 | );
1825 | route( overlayMsg );
1826 | delete overlayMsg;
1827 | return true;
1828 | }
1829 |
1830 | // handle DHT response messages
1831 | if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {
1832 | bool ret = handleDHTMessage(overlayMsg);
1833 | delete overlayMsg;
1834 | return ret;
1835 | }
1836 |
1837 | // handle base overlay message
1838 | bool ret = false; // return value
1839 | switch ( overlayMsg->getType() ) {
1840 |
1841 | // data transport messages
1842 | case OverlayMsg::typeData:
1843 | ret = handleData(overlayMsg, ld); break;
1844 |
1845 | // overlay setup messages
1846 | case OverlayMsg::typeJoinRequest:
1847 | ret = handleJoinRequest(overlayMsg, bcLink ); break;
1848 | case OverlayMsg::typeJoinReply:
1849 | ret = handleJoinReply(overlayMsg, bcLink ); break;
1850 |
1851 | // link specific messages
1852 | case OverlayMsg::typeLinkRequest:
1853 | ret = handleLinkRequest(overlayMsg, ld ); break;
1854 | case OverlayMsg::typeLinkReply:
1855 | ret = handleLinkReply(overlayMsg, ld ); break;
1856 | case OverlayMsg::typeLinkUpdate:
1857 | ret = handleLinkUpdate(overlayMsg, ld ); break;
1858 | case OverlayMsg::typeLinkAlive:
1859 | ret = handleLinkAlive(overlayMsg, ld ); break;
1860 | case OverlayMsg::typeLinkDirect:
1861 | ret = handleLinkDirect(overlayMsg, ld ); break;
1862 |
1863 | // handle unknown message type
1864 | default: {
1865 | logging_error( "received message in invalid state! don't know " <<
1866 | "what to do with this message of type " << overlayMsg->getType() );
1867 | ret = false;
1868 | break;
1869 | }
1870 | }
1871 |
1872 | // free overlay message and return value
1873 | delete overlayMsg;
1874 | return ret;
1875 | }
1876 |
1877 | // ----------------------------------------------------------------------------
1878 |
1879 | void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1880 |
1881 | logging_debug( "broadcasting message to all known nodes " <<
1882 | "in the overlay from service " + service.toString() );
1883 |
1884 | OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1885 | OverlayInterface::NodeList::iterator i = nodes.begin();
1886 | for(; i != nodes.end(); i++ ) {
1887 | if( *i == nodeId) continue; // don't send to ourselfs
1888 | sendMessage( message, *i, service );
1889 | }
1890 | }
1891 |
1892 | /// return the overlay neighbors
1893 | vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1894 | // the known nodes _can_ also include our node, so we remove ourself
1895 | vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1896 | vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1897 | if( i != nodes.end() ) nodes.erase( i );
1898 | return nodes;
1899 | }
1900 |
1901 | const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1902 | if( lid == LinkID::UNSPECIFIED ) return nodeId;
1903 | const LinkDescriptor* ld = getDescriptor(lid);
1904 | if( ld == NULL ) return NodeID::UNSPECIFIED;
1905 | else return ld->remoteNode;
1906 | }
1907 |
1908 | vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1909 | vector<LinkID> linkvector;
1910 | BOOST_FOREACH( LinkDescriptor* ld, links ) {
1911 | if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1912 | linkvector.push_back( ld->overlayId );
1913 | }
1914 | }
1915 | return linkvector;
1916 | }
1917 |
1918 |
1919 | void BaseOverlay::onNodeJoin(const NodeID& node) {
1920 | JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1921 | if( i == joiningNodes.end() ) return;
1922 |
1923 | logging_info( "node has successfully joined baseoverlay and overlay structure "
1924 | << node.toString() );
1925 |
1926 | joiningNodes.erase( i );
1927 | }
1928 |
1929 | void BaseOverlay::eventFunction() {
1930 | stabilizeRelays();
1931 | stabilizeLinks();
1932 | stabilizeDHT();
1933 | updateVisual();
1934 | }
1935 |
1936 | void BaseOverlay::updateVisual(){
1937 |
1938 | //
1939 | // update base overlay structure
1940 | //
1941 |
1942 | static NodeID pre = NodeID::UNSPECIFIED;
1943 | static NodeID suc = NodeID::UNSPECIFIED;
1944 |
1945 | vector<NodeID> nodes = this->getOverlayNeighbors(false);
1946 |
1947 | if(nodes.size() == 0){
1948 |
1949 | if(pre != NodeID::UNSPECIFIED){
1950 | visual.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
1951 | pre = NodeID::UNSPECIFIED;
1952 | }
1953 | if(suc != NodeID::UNSPECIFIED){
1954 | visual.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
1955 | suc = NodeID::UNSPECIFIED;
1956 | }
1957 |
1958 | } // if(nodes.size() == 0)
1959 |
1960 | if(nodes.size() == 1){
1961 | // only one node, make this pre and succ
1962 | // and then go into the node.size()==2 case
1963 | //nodes.push_back(nodes.at(0));
1964 |
1965 | if(pre != nodes.at(0)){
1966 | pre = nodes.at(0);
1967 | if(pre != NodeID::UNSPECIFIED)
1968 | visual.visConnect(visualIdOverlay, this->nodeId, pre, "");
1969 | }
1970 | }
1971 |
1972 | if(nodes.size() == 2){
1973 |
1974 | // old finger
1975 | if(nodes.at(0) != pre){
1976 | if(pre != NodeID::UNSPECIFIED)
1977 | visual.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
1978 | pre = NodeID::UNSPECIFIED;
1979 | }
1980 | if(nodes.at(1) != suc){
1981 | if(suc != NodeID::UNSPECIFIED)
1982 | visual.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
1983 | suc = NodeID::UNSPECIFIED;
1984 | }
1985 |
1986 | // connect with fingers
1987 | if(pre == NodeID::UNSPECIFIED){
1988 | pre = nodes.at(0);
1989 | if(pre != NodeID::UNSPECIFIED)
1990 | visual.visConnect(visualIdOverlay, this->nodeId, pre, "");
1991 | }
1992 | if(suc == NodeID::UNSPECIFIED){
1993 | suc = nodes.at(1);
1994 | if(suc != NodeID::UNSPECIFIED)
1995 | visual.visConnect(visualIdOverlay, this->nodeId, suc, "");
1996 | }
1997 |
1998 | } //if(nodes.size() == 2)
1999 |
2000 | // {
2001 | // logging_error("================================");
2002 | // logging_error("my nodeid " << nodeId.get(MAX_KEYLENGTH-16, 16));
2003 | // logging_error("================================");
2004 | // if(nodes.size()>= 1){
2005 | // logging_error("real pre " << nodes.at(0).toString());
2006 | // logging_error("real pre " << nodes.at(0).get(MAX_KEYLENGTH-16, 16));
2007 | // }
2008 | // if(nodes.size()>= 2){
2009 | // logging_error("real suc " << nodes.at(1).toString());
2010 | // logging_error("real suc " << nodes.at(1).get(MAX_KEYLENGTH-16, 16));
2011 | // }
2012 | // logging_error("================================");
2013 | // if(pre == NodeID::UNSPECIFIED){
2014 | // logging_error("pre: unspecified");
2015 | // }else{
2016 | // unsigned int prei = pre.get(MAX_KEYLENGTH-16, 16);
2017 | // logging_error("pre: " << prei);
2018 | // }
2019 | // if(suc == NodeID::UNSPECIFIED){
2020 | // logging_error("suc: unspecified");
2021 | // }else{
2022 | // unsigned int suci = suc.get(MAX_KEYLENGTH-16, 16);
2023 | // logging_error("suc: " << suci);
2024 | // }
2025 | // logging_error("================================");
2026 | // }
2027 |
2028 | //
2029 | // update base communication links
2030 | //
2031 |
2032 | static set<NodeID> linkset;
2033 | set<NodeID> remotenodes;
2034 | BOOST_FOREACH( LinkDescriptor* ld, links ) {
2035 | if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
2036 | continue;
2037 |
2038 | if (ld->routeRecord.size()>1 && ld->relayed) {
2039 | for (size_t i=1; i<ld->routeRecord.size(); i++)
2040 | remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] );
2041 | } else {
2042 | remotenodes.insert(ld->remoteNode);
2043 | }
2044 | }
2045 |
2046 | // which links are old and need deletion?
2047 | BOOST_FOREACH(NodeID n, linkset){
2048 | if(remotenodes.find(n) == remotenodes.end()){
2049 | visual.visDisconnect(visualIdBase, this->nodeId, n, "");
2050 | linkset.erase(n);
2051 | }
2052 | }
2053 |
2054 | // which links are new and need creation?
2055 | BOOST_FOREACH(NodeID n, remotenodes){
2056 | if(linkset.find(n) == linkset.end()){
2057 | visual.visConnect(visualIdBase, this->nodeId, n, "");
2058 | linkset.insert(n);
2059 | }
2060 | }
2061 |
2062 | }
2063 |
2064 | // ----------------------------------------------------------------------------
2065 |
2066 | /// stabilize DHT state
2067 | void BaseOverlay::stabilizeDHT() {
2068 | // remove old values from DHT
2069 | BOOST_FOREACH( DHTEntry& entry, dht->entries ) {
2070 | // erase old entries
2071 | entry.erase_expired_entries();
2072 | }
2073 |
2074 | // re-publish values
2075 | BOOST_FOREACH( DHTEntry& entry, localDHT->entries ) {
2076 | // erase old entries
2077 | entry.erase_expired_entries();
2078 |
2079 | // re-publish values
2080 | BOOST_FOREACH( ValueEntry& value, entry.values )
2081 | dhtPut(entry.key, value.get_value(), 0 );
2082 | }
2083 | }
2084 |
2085 | // handle DHT messages
2086 | bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {
2087 |
2088 | // decapsulate message
2089 | logging_debug("received DHT message");
2090 | DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();
2091 |
2092 | // handle DHT data message
2093 | if (msg->getType()==OverlayMsg::typeDHTData) {
2094 | const ServiceID& service = msg->getService();
2095 | logging_info( "Received DHT data for service " << service.toString() );
2096 |
2097 | // delegate data message
2098 | CommunicationListener* lst = getListener(service);
2099 | if(lst != NULL) lst->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );
2100 | return true;
2101 | }
2102 |
2103 | // route message to closest node
2104 | if (!overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
2105 | logging_debug("Routing DHT message to closest node "
2106 | << " from " << msg->getSourceNode()
2107 | << " to " << msg->getDestinationNode()
2108 | );
2109 | route( msg );
2110 | delete msg;
2111 | return true;
2112 | }
2113 |
2114 | // now, we are the closest node...
2115 | switch (msg->getType()) {
2116 | case OverlayMsg::typeDHTPut: {
2117 | if (dhtMsg->doReplace()) dht->remove(dhtMsg->getKey());
2118 | BOOST_FOREACH( Data value, dhtMsg->getValues() )
2119 | dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );
2120 | break;
2121 | }
2122 |
2123 | case OverlayMsg::typeDHTGet: {
2124 | logging_info("DHT-Get: key=" << dhtMsg->getKey() );
2125 | vector<Data> vect = dht->get(dhtMsg->getKey());
2126 | BOOST_FOREACH(const Data& d, vect)
2127 | logging_info("DHT-Get: value=" << d);
2128 | OverlayMsg omsg(*msg);
2129 | omsg.swapRoles();
2130 | omsg.setType(OverlayMsg::typeDHTData);
2131 | DHTMessage dhtmsg(dhtMsg->getKey(), vect);
2132 | omsg.encapsulate(&dhtmsg);
2133 | dhtSend(&omsg, omsg.getDestinationNode());
2134 | break;
2135 | }
2136 |
2137 | case OverlayMsg::typeDHTRemove: {
2138 | if (dhtMsg->hasValues()) {
2139 | BOOST_FOREACH( Data value, dhtMsg->getValues() )
2140 | dht->remove(dhtMsg->getKey(), value );
2141 | } else
2142 | dht->remove( dhtMsg->getKey() );
2143 | break;
2144 | }
2145 |
2146 | default:
2147 | logging_error("DHT Message type unknown.");
2148 | return false;
2149 | }
2150 | delete msg;
2151 | return true;
2152 | }
2153 |
2154 | /// put a value to the DHT with a ttl given in seconds
2155 | void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl, bool replace ) {
2156 |
2157 | logging_info("DHT: putting key=" << key
2158 | << " value=" << value
2159 | << " ttl=" << ttl
2160 | << " replace=" << replace
2161 | );
2162 |
2163 |
2164 | // put into local data store (for refreshes)
2165 | if (replace) localDHT->remove(key);
2166 | localDHT->put(key, value, ttl);
2167 |
2168 | // calculate hash
2169 | NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
2170 | DHTMessage dhtmsg(key,value);
2171 | dhtmsg.setReplace(replace);
2172 | dhtmsg.setTTL(ttl);
2173 |
2174 | OverlayMsg msg(OverlayMsg::typeDHTPut);
2175 | msg.encapsulate( &dhtmsg );
2176 | dhtSend(&msg, dest);
2177 | }
2178 |
2179 | /// removes a key value pair from the DHT
2180 | void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {
2181 | // remove from local data store
2182 | localDHT->remove(key,value);
2183 |
2184 | // calculate hash
2185 | NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
2186 | DHTMessage dhtmsg(key,value);
2187 |
2188 | // send message
2189 | OverlayMsg msg(OverlayMsg::typeDHTRemove);
2190 | msg.encapsulate( &dhtmsg );
2191 | dhtSend(&msg, dest);
2192 | }
2193 |
2194 | /// removes all data stored at the given key
2195 | void BaseOverlay::dhtRemove( const Data& key ) {
2196 | // calculate hash
2197 | NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
2198 | DHTMessage dhtmsg(key);
2199 |
2200 | // send message
2201 | OverlayMsg msg(OverlayMsg::typeDHTRemove);
2202 | msg.encapsulate( &dhtmsg );
2203 | dhtSend(&msg, dest);
2204 | }
2205 |
2206 | /// requests data stored using key
2207 | void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {
2208 | logging_info("DHT: trying to resolve key=" <<
2209 | key << " for service=" << service.toString() );
2210 |
2211 | // calculate hash
2212 | NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
2213 | DHTMessage dhtmsg(key);
2214 |
2215 | // send message
2216 | OverlayMsg msg(OverlayMsg::typeDHTGet);
2217 | msg.setService(service);
2218 | msg.encapsulate( &dhtmsg );
2219 | dhtSend(&msg, dest);
2220 | }
2221 |
2222 | void BaseOverlay::dhtSend( OverlayMsg* msg, const NodeID& dest ) {
2223 | logging_info("DHT: sending message with key=" << dest.toString() );
2224 | msg->setSourceNode(this->nodeId);
2225 | msg->setDestinationNode(dest);
2226 |
2227 | // local storage? yes-> put into DHT directly
2228 | if (overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
2229 | Data d = data_serialize(msg);
2230 | Message* m2 = new Message(d);
2231 | OverlayMsg* m3 = m2->decapsulate<OverlayMsg>();
2232 | handleDHTMessage(m3);
2233 | delete m2;
2234 | return;
2235 | }
2236 |
2237 | // send message "normally"
2238 | send(msg, dest);
2239 | }
2240 |
2241 |
2242 | }} // namespace ariba, overlay