source: source/ariba/overlay/BaseOverlay.cpp@ 6786

Last change on this file since 6786 was 6786, checked in by mies, 15 years ago

Changed Data to Message conversion constructor in Message to explicit
Fixed some general bugs in Data: operator<<
Fixed bug in DHTMessage: allow unspecified key/values
Added local DHT message delivery
Adapted sources to work with gcc 4.4.1

File size: 58.8 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/DHTMessage.h"
54#include "ariba/overlay/messages/JoinRequest.h"
55#include "ariba/overlay/messages/JoinReply.h"
56
57#include "ariba/utility/visual/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62class ValueEntry {
63public:
64 ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),
65 last_change(time(NULL)), value(value.clone()) {
66 }
67
68 ~ValueEntry() {
69 value.release();
70 }
71
72 void refresh() {
73 last_update = time(NULL);
74 }
75
76 void set_value( const Data& value ) {
77 this->value.release();
78 this->value = value.clone();
79 this->last_change = time(NULL);
80 this->last_update = time(NULL);
81 }
82
83 Data get_value() const {
84 return value;
85 }
86
87 uint16_t get_ttl() const {
88 return ttl;
89 }
90
91 void set_ttl( uint16_t ttl ) {
92 this->ttl = ttl;
93 }
94
95 bool is_ttl_elapsed() const {
96 // is persistent? yes-> always return false
97 if (ttl==0) return false;
98
99 // return true, if ttl is elapsed
100 return ( difftime( time(NULL), this->last_update ) >= ttl );
101 }
102
103private:
104 uint16_t ttl;
105 time_t last_update;
106 time_t last_change;
107 Data value;
108};
109
110class DHTEntry {
111public:
112 Data key;
113 vector<ValueEntry> values;
114
115 vector<Data> get_values() {
116 vector<Data> vect;
117 BOOST_FOREACH( ValueEntry& e, values )
118 vect.push_back( e.get_value() );
119 return vect;
120 }
121};
122
123class DHT {
124public:
125 typedef vector<DHTEntry> Entries;
126 typedef vector<ValueEntry> Values;
127 Entries entries;
128
129 static bool equals( const Data& lhs, const Data& rhs ) {
130 if (rhs.getLength()!=lhs.getLength()) return false;
131 for (int i=0; i<lhs.getLength()/8; i++)
132 if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;
133 return true;
134 }
135
136 void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {
137
138 // find entry
139 for (size_t i=0; i<entries.size(); i++) {
140 DHTEntry& entry = entries.at(i);
141
142 // check if key is already known
143 if ( equals(entry.key, key) ) {
144
145 // check if value is already in values list
146 for (size_t j=0; j<entry.values.size(); j++) {
147 // found value already? yes-> refresh ttl
148 if ( equals(entry.values[j].get_value(), value) ) {
149 entry.values[j].refresh();
150 std::cout << "DHT: Republished value. Refreshing TTL."
151 << std::endl;
152 return;
153 }
154 }
155
156 // new value-> add to entry
157 std::cout << "DHT: Added value to "
158 << " key=" << key << " with value=" << value << std::endl;
159 entry.values.push_back( ValueEntry( value ) );
160 entry.values.back().set_ttl(ttl);
161 return;
162 }
163 }
164
165 // key is unknown-> add key value pair
166 std::cout << "DHT: New key value pair "
167 << " key=" << key << " with value=" << value << std::endl;
168
169 // add new entry
170 entries.push_back( DHTEntry() );
171 DHTEntry& entry = entries.back();
172 entry.key = key.clone();
173 entry.values.push_back( ValueEntry(value) );
174 entry.values.back().set_ttl(ttl);
175 }
176
177 vector<Data> get( const Data& key ) {
178 // find entry
179 for (size_t i=0; i<entries.size(); i++) {
180 DHTEntry& entry = entries.at(i);
181 if ( equals(entry.key,key) )
182 return entry.get_values();
183 }
184 return vector<Data>();
185 }
186
187 bool remove( const Data& key ) {
188 // find entry
189 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
190 DHTEntry& entry = *i;
191
192 // found? yes-> delete entry
193 if ( equals(entry.key, key) ) {
194 i = entries.erase(i);
195 return true;
196 }
197 }
198 return false;
199 }
200
201 bool remove( const Data& key, const Data& value ) {
202 // find entry
203 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
204 DHTEntry& entry = *i;
205
206 // found? yes-> try to find value
207 if ( equals(entry.key, key) ) {
208 for (Values::iterator j = entry.values.begin();
209 j != entry.values.end(); j++) {
210
211 // value found? yes-> delete
212 if (equals(j->get_value(), value)) {
213 j = entry.values.erase(j);
214 return true;
215 }
216 }
217 }
218 }
219 return false;
220 }
221
222 void cleanup() {
223 // find entry
224 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
225 DHTEntry& entry = *i;
226
227 for (Values::iterator j = entry.values.begin();
228 j != entry.values.end(); j++) {
229
230 // value found? yes-> delete
231 if (j->is_ttl_elapsed())
232 j = entry.values.erase(j);
233 }
234
235 if (entry.values.size()==0) i = entries.erase(i);
236 }
237 }
238};
239
240// ----------------------------------------------------------------------------
241
242/* *****************************************************************************
243 * PREREQUESITES
244 * ****************************************************************************/
245
246CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
247 if( !communicationListeners.contains( service ) ) {
248 logging_error( "No listener found for service " << service.toString() );
249 return NULL;
250 }
251 CommunicationListener* listener = communicationListeners.get( service );
252 assert( listener != NULL );
253 return listener;
254}
255
256// link descriptor handling ----------------------------------------------------
257
258LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
259 BOOST_FOREACH( LinkDescriptor* lp, links )
260 if ((communication ? lp->communicationId : lp->overlayId) == link)
261 return lp;
262 return NULL;
263}
264
265const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
266 BOOST_FOREACH( const LinkDescriptor* lp, links )
267 if ((communication ? lp->communicationId : lp->overlayId) == link)
268 return lp;
269 return NULL;
270}
271
272/// erases a link descriptor
273void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
274 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
275 LinkDescriptor* ld = *i;
276 if ((communication ? ld->communicationId : ld->overlayId) == link) {
277 delete ld;
278 links.erase(i);
279 break;
280 }
281 }
282}
283
284/// adds a link descriptor
285LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
286 LinkDescriptor* desc = getDescriptor( link );
287 if ( desc == NULL ) {
288 desc = new LinkDescriptor();
289 if (!link.isUnspecified()) desc->overlayId = link;
290 links.push_back(desc);
291 }
292 return desc;
293}
294
295/// returns a auto-link descriptor
296LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
297 // search for a descriptor that is already up
298 BOOST_FOREACH( LinkDescriptor* lp, links )
299 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
300 return lp;
301 // if not found, search for one that is about to come up...
302 BOOST_FOREACH( LinkDescriptor* lp, links )
303 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
304 return lp;
305 return NULL;
306}
307
308/// stabilizes link information
309void BaseOverlay::stabilizeLinks() {
310 // send keep-alive messages over established links
311 BOOST_FOREACH( LinkDescriptor* ld, links ) {
312 if (!ld->up) continue;
313 OverlayMsg msg( OverlayMsg::typeLinkAlive,
314 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
315 if (ld->relayed) msg.setRouteRecord(true);
316 send_link( &msg, ld->overlayId );
317 }
318
319 // iterate over all links and check for time boundaries
320 vector<LinkDescriptor*> oldlinks;
321 time_t now = time(NULL);
322 BOOST_FOREACH( LinkDescriptor* ld, links ) {
323
324 // keep alives and not up? yes-> link connection request is stale!
325 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
326
327 // increase counter
328 ld->keepAliveMissed++;
329
330 // missed more than four keep-alive messages (10 sec)? -> drop link
331 if (ld->keepAliveMissed > 4) {
332 logging_info( "Link connection request is stale, closing: " << ld );
333 oldlinks.push_back( ld );
334 continue;
335 }
336 }
337
338 if (!ld->up) continue;
339
340 // remote used as relay flag
341 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
342 ld->relaying = false;
343
344 // drop links that are dropped and not used as relay
345 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
346 oldlinks.push_back( ld );
347 continue;
348 }
349
350 // auto-link time exceeded?
351 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
352 oldlinks.push_back( ld );
353 continue;
354 }
355
356 // keep alives missed? yes->
357 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
358
359 // increase counter
360 ld->keepAliveMissed++;
361
362 // missed more than four keep-alive messages (4 sec)? -> drop link
363 if (ld->keepAliveMissed >= 4) {
364 logging_info( "Link is stale, closing: " << ld );
365 oldlinks.push_back( ld );
366 continue;
367 }
368 }
369 }
370
371 // drop links
372 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
373 logging_info( "Link timed out. Dropping " << ld );
374 ld->relaying = false;
375 dropLink( ld->overlayId );
376 }
377
378 // show link state
379 counter++;
380 if (counter>=4) showLinks();
381 if (counter>=4 || counter<0) counter = 0;
382}
383
384
385std::string BaseOverlay::getLinkHTMLInfo() {
386 std::ostringstream s;
387 vector<NodeID> nodes;
388 if (links.size()==0) {
389 s << "<h2 style=\"color=#606060\">No links established!</h2>";
390 } else {
391 s << "<h2 style=\"color=#606060\">Links</h2>";
392 s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
393 s << "<tr style=\"background-color=#ffe0e0\">";
394 s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
395 s << "</tr>";
396
397 int i=0;
398 BOOST_FOREACH( LinkDescriptor* ld, links ) {
399 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
400 bool found = false;
401 BOOST_FOREACH(NodeID& id, nodes)
402 if (id == ld->remoteNode) found = true;
403 if (found) continue;
404 i++;
405 nodes.push_back(ld->remoteNode);
406 if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
407 else s << "<tr>";
408 s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
409 s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
410 s << "<td>";
411 if (ld->routeRecord.size()>1 && ld->relayed) {
412 for (size_t i=1; i<ld->routeRecord.size(); i++)
413 s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
414 } else {
415 s << "Direct";
416 }
417 s << "</td>";
418 s << "</tr>";
419 }
420 s << "</table>";
421 }
422 return s.str();
423}
424
425/// shows the current link state
426void BaseOverlay::showLinks() {
427 int i=0;
428 logging_info("--- link state -------------------------------");
429 BOOST_FOREACH( LinkDescriptor* ld, links ) {
430 logging_info("link " << i << ": " << ld);
431 i++;
432 }
433 logging_info("----------------------------------------------");
434}
435
436/// compares two arbitrary links to the same node
437int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
438 LinkDescriptor* lhsld = getDescriptor(lhs);
439 LinkDescriptor* rhsld = getDescriptor(rhs);
440 if (lhsld==NULL || rhsld==NULL
441 || !lhsld->up || !rhsld->up
442 || lhsld->remoteNode != rhsld->remoteNode) return -1;
443
444 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
445 return -1;
446
447 return 1;
448}
449
450
451// internal message delivery ---------------------------------------------------
452
453/// routes a message to its destination node
454void BaseOverlay::route( OverlayMsg* message ) {
455
456 // exceeded time-to-live? yes-> drop message
457 if (message->getNumHops() > message->getTimeToLive()) {
458 logging_warn("Message exceeded TTL. Dropping message and relay routes"
459 "for recovery.");
460 removeRelayNode(message->getDestinationNode());
461 return;
462 }
463
464 // no-> forward message
465 else {
466 // destinastion myself? yes-> handle message
467 if (message->getDestinationNode() == nodeId) {
468 logging_warn("Usually I should not route messages to myself!");
469 Message msg;
470 msg.encapsulate(message);
471 handleMessage( &msg, NULL );
472 } else {
473 // no->send message to next hop
474 send( message, message->getDestinationNode() );
475 }
476 }
477}
478
479/// sends a message to another node, delivers it to the base overlay class
480seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
481 LinkDescriptor* next_link = NULL;
482
483 // drop messages to unspecified destinations
484 if (destination.isUnspecified()) return -1;
485
486 // send messages to myself -> handle message and drop warning!
487 if (destination == nodeId) {
488 logging_warn("Sent message to myself. Handling message.")
489 Message msg;
490 msg.encapsulate(message);
491 handleMessage( &msg, NULL );
492 return -1;
493 }
494
495 // use relay path?
496 if (message->isRelayed()) {
497 next_link = getRelayLinkTo( destination );
498 if (next_link != NULL) {
499 next_link->setRelaying();
500 return bc->sendMessage(next_link->communicationId, message);
501 } else {
502 logging_warn("Could not send message. No relay hop found to "
503 << destination)
504 return -1;
505 }
506 }
507
508 // routed message
509 else {
510 // no-> relay path! route over overlay path
511 LinkID next_id = overlayInterface->getNextLinkId( destination );
512 if (next_id.isUnspecified()) {
513 logging_warn("Could not send message. No next hop found to " <<
514 destination );
515 return -1;
516 }
517
518 // get link descriptor, up and running? yes-> send message
519 next_link = getDescriptor(next_id);
520 if (next_link != NULL && next_link->up) {
521 // send message over relayed link
522 return send(message, next_link);
523 }
524
525 // no-> error, dropping message
526 else {
527 logging_warn("Could not send message. Link not known or up");
528 return -1;
529 }
530 }
531
532 // not reached-> fail
533 return -1;
534}
535
536/// send a message using a link descriptor, delivers it to the base overlay class
537seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
538 // check if null
539 if (ldr == NULL) {
540 logging_error("Can not send message to " << message->getDestinationAddress());
541 return -1;
542 }
543
544 // check if up
545 if (!ldr->up && !ignore_down) {
546 logging_error("Can not send message. Link not up:" << ldr );
547 return -1;
548 }
549 LinkDescriptor* ld = NULL;
550
551 // handle relayed link
552 if (ldr->relayed) {
553 logging_debug("Resolving direct link for relayed link to "
554 << ldr->remoteNode);
555 ld = getRelayLinkTo( ldr->remoteNode );
556 if (ld==NULL) {
557 logging_error("No relay path found to link " << ldr );
558 return -1;
559 }
560 ld->setRelaying();
561 message->setRelayed(true);
562 } else
563 ld = ldr;
564
565 // handle direct link
566 if (ld->communicationUp) {
567 logging_debug("send(): Sending message over direct link.");
568 return bc->sendMessage( ld->communicationId, message );
569 } else {
570 logging_error("send(): Could not send message. "
571 "Not a relayed link and direct link is not up.");
572 return -1;
573 }
574 return -1;
575}
576
577seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
578 const ServiceID& service) {
579 message->setSourceNode(nodeId);
580 message->setDestinationNode(remote);
581 message->setService(service);
582 send( message, remote );
583}
584
585seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
586 LinkDescriptor* ld = getDescriptor(link);
587 if (ld==NULL) {
588 logging_error("Cannot find descriptor to link id=" << link.toString());
589 return -1;
590 }
591 message->setSourceNode(nodeId);
592 message->setDestinationNode(ld->remoteNode);
593
594 message->setSourceLink(ld->overlayId);
595 message->setDestinationLink(ld->remoteLink);
596
597 message->setService(ld->service);
598 message->setRelayed(ld->relayed);
599 return send( message, ld, ignore_down );
600}
601
602// relay route management ------------------------------------------------------
603
604/// stabilize relay information
605void BaseOverlay::stabilizeRelays() {
606 vector<relay_route>::iterator i = relay_routes.begin();
607 while (i!=relay_routes.end() ) {
608 relay_route& route = *i;
609 LinkDescriptor* ld = getDescriptor(route.link);
610
611 // relay link still used and alive?
612 if (ld==NULL
613 || !ld->isDirectVital()
614 || difftime(route.used, time(NULL)) > 8) {
615 logging_info("Forgetting relay information to node "
616 << route.node.toString() );
617 i = relay_routes.erase(i);
618 } else
619 i++;
620 }
621}
622
623void BaseOverlay::removeRelayLink( const LinkID& link ) {
624 vector<relay_route>::iterator i = relay_routes.begin();
625 while (i!=relay_routes.end() ) {
626 relay_route& route = *i;
627 if (route.link == link ) i = relay_routes.erase(i); else i++;
628 }
629}
630
631void BaseOverlay::removeRelayNode( const NodeID& remote ) {
632 vector<relay_route>::iterator i = relay_routes.begin();
633 while (i!=relay_routes.end() ) {
634 relay_route& route = *i;
635 if (route.node == remote ) i = relay_routes.erase(i); else i++;
636 }
637}
638
639/// refreshes relay information
640void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
641
642 // handle relayed messages from real links only
643 if (ld == NULL
644 || ld->relayed
645 || message->getSourceNode()==nodeId ) return;
646
647 // update usage information
648 if (message->isRelayed()) {
649 // try to find source node
650 BOOST_FOREACH( relay_route& route, relay_routes ) {
651 // relay route found? yes->
652 if ( route.node == message->getDestinationNode() ) {
653 ld->setRelaying();
654 route.used = time(NULL);
655 }
656 }
657
658 }
659
660 // register relay path
661 if (message->isRegisterRelay()) {
662 // set relaying
663 ld->setRelaying();
664
665 // try to find source node
666 BOOST_FOREACH( relay_route& route, relay_routes ) {
667
668 // relay route found? yes->
669 if ( route.node == message->getSourceNode() ) {
670
671 // refresh timer
672 route.used = time(NULL);
673 LinkDescriptor* rld = getDescriptor(route.link);
674
675 // route has a shorter hop count or old link is dead? yes-> replace
676 if (route.hops > message->getNumHops()
677 || rld == NULL
678 || !rld->isDirectVital()) {
679 logging_info("Updating relay information to node "
680 << route.node.toString()
681 << " reducing to " << message->getNumHops() << " hops.");
682 route.hops = message->getNumHops();
683 route.link = ld->overlayId;
684 }
685 return;
686 }
687 }
688
689 // not found-> add new entry
690 relay_route route;
691 route.hops = message->getNumHops();
692 route.link = ld->overlayId;
693 route.node = message->getSourceNode();
694 route.used = time(NULL);
695 logging_info("Remembering relay information to node "
696 << route.node.toString());
697 relay_routes.push_back(route);
698 }
699}
700
701/// returns a known "vital" relay link which is up and running
702LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
703 // try to find source node
704 BOOST_FOREACH( relay_route& route, relay_routes ) {
705 if (route.node == remote ) {
706 LinkDescriptor* ld = getDescriptor( route.link );
707 if (ld==NULL || !ld->isDirectVital()) return NULL; else {
708 route.used = time(NULL);
709 return ld;
710 }
711 }
712 }
713 return NULL;
714}
715
716/* *****************************************************************************
717 * PUBLIC MEMBERS
718 * ****************************************************************************/
719
720use_logging_cpp(BaseOverlay);
721
722// ----------------------------------------------------------------------------
723
724BaseOverlay::BaseOverlay() :
725 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
726 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
727 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
728 dht = new DHT();
729 localDHT = new DHT();
730}
731
732BaseOverlay::~BaseOverlay() {
733 delete dht;
734}
735
736// ----------------------------------------------------------------------------
737
738void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
739 logging_info("Starting...");
740
741 // set parameters
742 bc = &_basecomm;
743 nodeId = _nodeid;
744
745 // register at base communication
746 bc->registerMessageReceiver( this );
747 bc->registerEventListener( this );
748
749 // timer for auto link management
750 Timer::setInterval( 1000 );
751 Timer::start();
752
753 started = true;
754 state = BaseOverlayStateInvalid;
755}
756
757void BaseOverlay::stop() {
758 logging_info("Stopping...");
759
760 // stop timer
761 Timer::stop();
762
763 // delete oberlay interface
764 if(overlayInterface != NULL) {
765 delete overlayInterface;
766 overlayInterface = NULL;
767 }
768
769 // unregister at base communication
770 bc->unregisterMessageReceiver( this );
771 bc->unregisterEventListener( this );
772
773 started = false;
774 state = BaseOverlayStateInvalid;
775}
776
777bool BaseOverlay::isStarted(){
778 return started;
779}
780
781// ----------------------------------------------------------------------------
782
783void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
784 const EndpointDescriptor& bootstrapEp) {
785
786 if(id != spovnetId){
787 logging_error("attempt to join against invalid spovnet, call initiate first");
788 return;
789 }
790
791
792 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
793 logging_info( "Starting to join spovnet " << id.toString() <<
794 " with nodeid " << nodeId.toString());
795
796 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
797
798 // bootstrap against ourselfs
799 logging_debug("joining spovnet locally");
800
801 overlayInterface->joinOverlay();
802 state = BaseOverlayStateCompleted;
803 BOOST_FOREACH( NodeListener* i, nodeListeners )
804 i->onJoinCompleted( spovnetId );
805
806 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
807 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
808
809 logging_debug("starting overlay bootstrap module");
810 overlayBootstrap.start(this, spovnetId, nodeId);
811 overlayBootstrap.publish(bc->getEndpointDescriptor());
812
813 } else {
814
815 // bootstrap against another node
816 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
817
818 const LinkID& lnk = bc->establishLink( bootstrapEp );
819 bootstrapLinks.push_back(lnk);
820 logging_info("join process initiated for " << id.toString() << "...");
821 }
822}
823
824void BaseOverlay::leaveSpoVNet() {
825
826 logging_info( "Leaving spovnet " << spovnetId );
827 bool ret = ( state != this->BaseOverlayStateInvalid );
828
829 logging_debug("stopping overlay bootstrap module");
830 overlayBootstrap.stop();
831 overlayBootstrap.revoke();
832
833 logging_debug( "Dropping all auto-links" );
834
835 // gather all service links
836 vector<LinkID> servicelinks;
837 BOOST_FOREACH( LinkDescriptor* ld, links ) {
838 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
839 servicelinks.push_back( ld->overlayId );
840 }
841
842 // drop all service links
843 BOOST_FOREACH( LinkID lnk, servicelinks )
844 dropLink( lnk );
845
846 // let the node leave the spovnet overlay interface
847 logging_debug( "Leaving overlay" );
848 if( overlayInterface != NULL )
849 overlayInterface->leaveOverlay();
850
851 // drop still open bootstrap links
852 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
853 bc->dropLink( lnk );
854
855 // change to inalid state
856 state = BaseOverlayStateInvalid;
857 //ovl.visShutdown( ovlId, nodeId, string("") );
858
859 // inform all registered services of the event
860 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
861 if( ret ) i->onLeaveCompleted( spovnetId );
862 else i->onLeaveFailed( spovnetId );
863 }
864}
865
866void BaseOverlay::createSpoVNet(const SpoVNetID& id,
867 const OverlayParameterSet& param,
868 const SecurityParameterSet& sec,
869 const QoSParameterSet& qos) {
870
871 // set the state that we are an initiator, this way incoming messages are
872 // handled correctly
873 logging_info( "creating spovnet " + id.toString() <<
874 " with nodeid " << nodeId.toString() );
875
876 spovnetId = id;
877
878 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
879 if( overlayInterface == NULL ) {
880 logging_fatal( "overlay structure not supported" );
881 state = BaseOverlayStateInvalid;
882
883 BOOST_FOREACH( NodeListener* i, nodeListeners )
884 i->onJoinFailed( spovnetId );
885
886 return;
887 }
888}
889
890// ----------------------------------------------------------------------------
891
892const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
893 const NodeID& remoteId, const ServiceID& service ) {
894
895 // establish link via overlay
896 if (!remoteId.isUnspecified())
897 return establishLink( remoteId, service );
898 else
899
900 // establish link directly if only ep is known
901 if (remoteId.isUnspecified())
902 return establishDirectLink(remoteEp, service );
903
904}
905
906/// call base communication's establish link and add link mapping
907const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
908 const ServiceID& service ) {
909
910 /// find a service listener
911 if( !communicationListeners.contains( service ) ) {
912 logging_error( "No listener registered for service id=" << service.toString() );
913 return LinkID::UNSPECIFIED;
914 }
915 CommunicationListener* listener = communicationListeners.get( service );
916 assert( listener != NULL );
917
918 // create descriptor
919 LinkDescriptor* ld = addDescriptor();
920 ld->relayed = false;
921 ld->listener = listener;
922 ld->service = service;
923 ld->communicationId = bc->establishLink( ep );
924
925 /// establish link and add mapping
926 logging_info("Establishing direct link " << ld->communicationId.toString()
927 << " using " << ep.toString());
928
929 return ld->communicationId;
930}
931
932/// establishes a link between two arbitrary nodes
933const LinkID BaseOverlay::establishLink( const NodeID& remote,
934 const ServiceID& service ) {
935
936 // do not establish a link to myself!
937 if (remote == nodeId) return LinkID::UNSPECIFIED;
938
939 // create a link descriptor
940 LinkDescriptor* ld = addDescriptor();
941 ld->relayed = true;
942 ld->remoteNode = remote;
943 ld->service = service;
944 ld->listener = getListener(ld->service);
945
946 // create link request message
947 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
948 msg.setSourceLink(ld->overlayId);
949 msg.setRelayed(true);
950
951 // debug message
952 logging_info(
953 "Sending link request with"
954 << " link=" << ld->overlayId.toString()
955 << " node=" << ld->remoteNode.toString()
956 << " serv=" << ld->service.toString()
957 );
958
959 // sending message to node
960 send_node( &msg, ld->remoteNode, ld->service );
961
962 return ld->overlayId;
963}
964
965/// drops an established link
966void BaseOverlay::dropLink(const LinkID& link) {
967 logging_info( "Dropping link (initiated locally):" << link.toString() );
968
969 // find the link item to drop
970 LinkDescriptor* ld = getDescriptor(link);
971 if( ld == NULL ) {
972 logging_warn( "Can't drop link, link is unknown!");
973 return;
974 }
975
976 // delete all queued messages
977 if( ld->messageQueue.size() > 0 ) {
978 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
979 << ld->messageQueue.size() << " waiting messages" );
980 ld->flushQueue();
981 }
982
983 // inform sideport and listener
984 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
985 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
986
987 // do not drop relay links
988 if (!ld->relaying) {
989 // drop the link in base communication
990 if (ld->communicationUp) bc->dropLink( ld->communicationId );
991
992 // erase descriptor
993 eraseDescriptor( ld->overlayId );
994 } else {
995 ld->dropAfterRelaying = true;
996 }
997}
998
999// ----------------------------------------------------------------------------
1000
1001/// internal send message, always use this functions to send messages over links
1002seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
1003 logging_debug( "Sending data message on link " << link.toString() );
1004
1005 // get the mapping for this link
1006 LinkDescriptor* ld = getDescriptor(link);
1007 if( ld == NULL ) {
1008 logging_error("Could not send message. "
1009 << "Link not found id=" << link.toString());
1010 return -1;
1011 }
1012
1013 // check if the link is up yet, if its an auto link queue message
1014 if( !ld->up ) {
1015 ld->setAutoUsed();
1016 if( ld->autolink ) {
1017 logging_info("Auto-link " << link.toString() << " not up, queue message");
1018 Data data = data_serialize( message );
1019 const_cast<Message*>(message)->dropPayload();
1020 ld->messageQueue.push_back( new Message(data) );
1021 } else {
1022 logging_error("Link " << link.toString() << " not up, drop message");
1023 }
1024 return -1;
1025 }
1026
1027 // compile overlay message (has service and node id)
1028 OverlayMsg overmsg( OverlayMsg::typeData );
1029 overmsg.encapsulate( const_cast<Message*>(message) );
1030
1031 // send message over relay/direct/overlay
1032 return send_link( &overmsg, ld->overlayId );
1033}
1034
1035seqnum_t BaseOverlay::sendMessage(const Message* message,
1036 const NodeID& node, const ServiceID& service) {
1037
1038 // find link for node and service
1039 LinkDescriptor* ld = getAutoDescriptor( node, service );
1040
1041 // if we found no link, create an auto link
1042 if( ld == NULL ) {
1043
1044 // debug output
1045 logging_info( "No link to send message to node "
1046 << node.toString() << " found for service "
1047 << service.toString() << ". Creating auto link ..."
1048 );
1049
1050 // call base overlay to create a link
1051 LinkID link = establishLink( node, service );
1052 ld = getDescriptor( link );
1053 if( ld == NULL ) {
1054 logging_error( "Failed to establish auto-link.");
1055 return -1;
1056 }
1057 ld->autolink = true;
1058
1059 logging_debug( "Auto-link establishment in progress to node "
1060 << node.toString() << " with link id=" << link.toString() );
1061 }
1062 assert(ld != NULL);
1063
1064 // mark the link as used, as we now send a message through it
1065 ld->setAutoUsed();
1066
1067 // send / queue message
1068 return sendMessage( message, ld->overlayId );
1069}
1070
1071// ----------------------------------------------------------------------------
1072
1073const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1074 const LinkID& link) const {
1075
1076 // return own end-point descriptor
1077 if( link == LinkID::UNSPECIFIED )
1078 return bc->getEndpointDescriptor();
1079
1080 // find link descriptor. not found -> return unspecified
1081 const LinkDescriptor* ld = getDescriptor(link);
1082 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
1083
1084 // return endpoint-descriptor from base communication
1085 return bc->getEndpointDescriptor( ld->communicationId );
1086}
1087
1088const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1089 const NodeID& node) const {
1090
1091 // return own end-point descriptor
1092 if( node == nodeId || node == NodeID::UNSPECIFIED )
1093 return bc->getEndpointDescriptor();
1094
1095 // no joined and request remote descriptor? -> fail!
1096 if( overlayInterface == NULL ) {
1097 logging_error( "overlay interface not set, cannot resolve endpoint" );
1098 return EndpointDescriptor::UNSPECIFIED();
1099 }
1100
1101 // resolve end-point descriptor from the base-overlay routing table
1102 const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
1103 if(ep != EndpointDescriptor::UNSPECIFIED()) return ep;
1104
1105 // see if we can find the node in our own table
1106 BOOST_FOREACH(const LinkDescriptor* ld, links){
1107 if(ld->remoteNode != node) continue;
1108 const EndpointDescriptor& ep = bc->getEndpointDescriptor(ld->communicationId);
1109 if(ep.toString().size()==0) continue;
1110 if(ep != EndpointDescriptor::UNSPECIFIED()) return ep;
1111 }
1112
1113 return EndpointDescriptor::UNSPECIFIED();
1114}
1115
1116// ----------------------------------------------------------------------------
1117
1118bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1119 sideport = _sideport;
1120 _sideport->configure( this );
1121}
1122
1123bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1124 sideport = &SideportListener::DEFAULT;
1125}
1126
1127// ----------------------------------------------------------------------------
1128
1129bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1130 logging_debug( "binding communication listener " << listener
1131 << " on serviceid " << sid.toString() );
1132
1133 if( communicationListeners.contains( sid ) ) {
1134 logging_error( "some listener already registered for service id "
1135 << sid.toString() );
1136 return false;
1137 }
1138
1139 communicationListeners.registerItem( listener, sid );
1140 return true;
1141}
1142
1143
1144bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1145 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1146
1147 if( !communicationListeners.contains( sid ) ) {
1148 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1149 return false;
1150 }
1151
1152 if( communicationListeners.get(sid) != listener ) {
1153 logging_warn( "listener bound to service id " << sid.toString()
1154 << " is different than listener trying to unbind" );
1155 return false;
1156 }
1157
1158 communicationListeners.unregisterItem( sid );
1159 return true;
1160}
1161
1162// ----------------------------------------------------------------------------
1163
1164bool BaseOverlay::bind(NodeListener* listener) {
1165 logging_debug( "Binding node listener " << listener );
1166
1167 // already bound? yes-> warning
1168 NodeListenerVector::iterator i =
1169 find( nodeListeners.begin(), nodeListeners.end(), listener );
1170 if( i != nodeListeners.end() ) {
1171 logging_warn("Node listener " << listener << " is already bound!" );
1172 return false;
1173 }
1174
1175 // no-> add
1176 nodeListeners.push_back( listener );
1177 return true;
1178}
1179
1180bool BaseOverlay::unbind(NodeListener* listener) {
1181 logging_debug( "Unbinding node listener " << listener );
1182
1183 // already unbound? yes-> warning
1184 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1185 if( i == nodeListeners.end() ) {
1186 logging_warn( "Node listener " << listener << " is not bound!" );
1187 return false;
1188 }
1189
1190 // no-> remove
1191 nodeListeners.erase( i );
1192 return true;
1193}
1194
1195// ----------------------------------------------------------------------------
1196
1197void BaseOverlay::onLinkUp(const LinkID& id,
1198 const address_v* local, const address_v* remote) {
1199 logging_debug( "Link up with base communication link id=" << id );
1200
1201 // get descriptor for link
1202 LinkDescriptor* ld = getDescriptor(id, true);
1203
1204 // handle bootstrap link we initiated
1205 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1206 logging_info(
1207 "Join has been initiated by me and the link is now up. " <<
1208 "Sending out join request for SpoVNet " << spovnetId.toString()
1209 );
1210
1211 // send join request message
1212 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1213 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1214 JoinRequest joinRequest( spovnetId, nodeId );
1215 overlayMsg.encapsulate( &joinRequest );
1216 bc->sendMessage( id, &overlayMsg );
1217 return;
1218 }
1219
1220 // no link found? -> link establishment from remote, add one!
1221 if (ld == NULL) {
1222 ld = addDescriptor( id );
1223 logging_info( "onLinkUp (remote request) descriptor: " << ld );
1224
1225 // update descriptor
1226 ld->fromRemote = true;
1227 ld->communicationId = id;
1228 ld->communicationUp = true;
1229 ld->setAutoUsed();
1230 ld->setAlive();
1231
1232 // in this case, do not inform listener, since service it unknown
1233 // -> wait for update message!
1234
1235 // link mapping found? -> send update message with node-id and service id
1236 } else {
1237 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1238
1239 // update descriptor
1240 ld->setAutoUsed();
1241 ld->setAlive();
1242 ld->communicationUp = true;
1243 ld->fromRemote = false;
1244
1245 // if link is a relayed link->convert to direct link
1246 if (ld->relayed) {
1247 logging_info( "Converting to direct link: " << ld );
1248 ld->up = true;
1249 ld->relayed = false;
1250 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1251 overMsg.setSourceLink( ld->overlayId );
1252 overMsg.setDestinationLink( ld->remoteLink );
1253 send_link( &overMsg, ld->overlayId );
1254 } else {
1255 // note: necessary to validate the link on the remote side!
1256 logging_info( "Sending out update" <<
1257 " for service " << ld->service.toString() <<
1258 " with local node id " << nodeId.toString() <<
1259 " on link " << ld->overlayId.toString() );
1260
1261 // compile and send update message
1262 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1263 overlayMsg.setSourceLink(ld->overlayId);
1264 overlayMsg.setAutoLink( ld->autolink );
1265 send_link( &overlayMsg, ld->overlayId, true );
1266 }
1267 }
1268}
1269
1270void BaseOverlay::onLinkDown(const LinkID& id,
1271 const address_v* local, const address_v* remote) {
1272
1273 // erase bootstrap links
1274 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1275 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1276
1277 // get descriptor for link
1278 LinkDescriptor* ld = getDescriptor(id, true);
1279 if ( ld == NULL ) return; // not found? ->ignore!
1280 logging_info( "onLinkDown descriptor: " << ld );
1281
1282 // removing relay link information
1283 removeRelayLink(ld->overlayId);
1284
1285 // inform listeners about link down
1286 ld->communicationUp = false;
1287 if (!ld->service.isUnspecified()) {
1288 getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode );
1289 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1290 }
1291
1292 // delete all queued messages (auto links)
1293 if( ld->messageQueue.size() > 0 ) {
1294 logging_warn( "Dropping link " << id.toString() << " that has "
1295 << ld->messageQueue.size() << " waiting messages" );
1296 ld->flushQueue();
1297 }
1298
1299 // erase mapping
1300 eraseDescriptor(ld->overlayId);
1301}
1302
1303void BaseOverlay::onLinkChanged(const LinkID& id,
1304 const address_v* oldlocal, const address_v* newlocal,
1305 const address_v* oldremote, const address_v* newremote) {
1306
1307 // get descriptor for link
1308 LinkDescriptor* ld = getDescriptor(id, true);
1309 if ( ld == NULL ) return; // not found? ->ignore!
1310 logging_debug( "onLinkChanged descriptor: " << ld );
1311
1312 // inform listeners
1313 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1314 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1315
1316 // autolinks: refresh timestamp
1317 ld->setAutoUsed();
1318}
1319
1320void BaseOverlay::onLinkFail(const LinkID& id,
1321 const address_v* local, const address_v* remote) {
1322 logging_debug( "Link fail with base communication link id=" << id );
1323
1324 // erase bootstrap links
1325 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1326 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1327
1328 // get descriptor for link
1329 LinkDescriptor* ld = getDescriptor(id, true);
1330 if ( ld == NULL ) return; // not found? ->ignore!
1331 logging_debug( "Link failed id=" << ld->overlayId.toString() );
1332
1333 // inform listeners
1334 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1335 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1336}
1337
1338void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1339 const address_v* remote, const QoSParameterSet& qos) {
1340 logging_debug( "Link quality changed with base communication link id=" << id );
1341
1342 // get descriptor for link
1343 LinkDescriptor* ld = getDescriptor(id, true);
1344 if ( ld == NULL ) return; // not found? ->ignore!
1345 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1346}
1347
1348bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1349 const address_v* remote ) {
1350 logging_debug("Accepting link request from " << remote->to_string() );
1351 return true;
1352}
1353
1354/// handles a message from base communication
1355bool BaseOverlay::receiveMessage(const Message* message,
1356 const LinkID& link, const NodeID& ) {
1357 // get descriptor for link
1358 LinkDescriptor* ld = getDescriptor( link, true );
1359 return handleMessage( message, ld, link );
1360}
1361
1362// ----------------------------------------------------------------------------
1363
1364/// Handle spovnet instance join requests
1365bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1366
1367 // decapsulate message
1368 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1369 logging_info( "Received join request for spovnet " <<
1370 joinReq->getSpoVNetID().toString() );
1371
1372 // check spovnet id
1373 if( joinReq->getSpoVNetID() != spovnetId ) {
1374 logging_error(
1375 "Received join request for spovnet we don't handle " <<
1376 joinReq->getSpoVNetID().toString() );
1377 return false;
1378 }
1379
1380 // TODO: here you can implement mechanisms to deny joining of a node
1381 bool allow = true;
1382 logging_info( "Sending join reply for spovnet " <<
1383 spovnetId.toString() << " to node " <<
1384 overlayMsg->getSourceNode().toString() <<
1385 ". Result: " << (allow ? "allowed" : "denied") );
1386 joiningNodes.push_back( overlayMsg->getSourceNode() );
1387
1388 // return overlay parameters
1389 assert( overlayInterface != NULL );
1390 logging_debug( "Using bootstrap end-point "
1391 << getEndpointDescriptor().toString() )
1392 OverlayParameterSet parameters = overlayInterface->getParameters();
1393 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1394 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1395 JoinReply replyMsg( spovnetId, parameters,
1396 allow, getEndpointDescriptor() );
1397 retmsg.encapsulate(&replyMsg);
1398 bc->sendMessage( bcLink, &retmsg );
1399
1400 return true;
1401}
1402
1403/// Handle replies to spovnet instance join requests
1404bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1405 // decapsulate message
1406 logging_debug("received join reply message");
1407 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1408
1409 // correct spovnet?
1410 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1411 logging_error( "Received SpoVNet join reply for " <<
1412 replyMsg->getSpoVNetID().toString() <<
1413 " != " << spovnetId.toString() );
1414 delete replyMsg;
1415 return false;
1416 }
1417
1418 // access granted? no -> fail
1419 if( !replyMsg->getJoinAllowed() ) {
1420 logging_error( "Our join request has been denied" );
1421
1422 // drop initiator link
1423 if( !bcLink.isUnspecified() ){
1424 bc->dropLink( bcLink );
1425
1426 vector<LinkID>::iterator it = std::find(
1427 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1428 if( it != bootstrapLinks.end() )
1429 bootstrapLinks.erase(it);
1430 }
1431
1432 // inform all registered services of the event
1433 BOOST_FOREACH( NodeListener* i, nodeListeners )
1434 i->onJoinFailed( spovnetId );
1435
1436 delete replyMsg;
1437 return true;
1438 }
1439
1440 // access has been granted -> continue!
1441 logging_info("Join request has been accepted for spovnet " <<
1442 spovnetId.toString() );
1443
1444 logging_debug( "Using bootstrap end-point "
1445 << replyMsg->getBootstrapEndpoint().toString() );
1446
1447 // create overlay structure from spovnet parameter set
1448 // if we have not boostrapped yet against some other node
1449 if( overlayInterface == NULL ){
1450
1451 logging_debug("first-time bootstrapping");
1452
1453 overlayInterface = OverlayFactory::create(
1454 *this, replyMsg->getParam(), nodeId, this );
1455
1456 // overlay structure supported? no-> fail!
1457 if( overlayInterface == NULL ) {
1458 logging_error( "overlay structure not supported" );
1459
1460 if( !bcLink.isUnspecified() ){
1461 bc->dropLink( bcLink );
1462
1463 vector<LinkID>::iterator it = std::find(
1464 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1465 if( it != bootstrapLinks.end() )
1466 bootstrapLinks.erase(it);
1467 }
1468
1469 // inform all registered services of the event
1470 BOOST_FOREACH( NodeListener* i, nodeListeners )
1471 i->onJoinFailed( spovnetId );
1472
1473 delete replyMsg;
1474 return true;
1475 }
1476
1477 // everything ok-> join the overlay!
1478 state = BaseOverlayStateCompleted;
1479 overlayInterface->createOverlay();
1480
1481 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1482 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1483
1484 // update ovlvis
1485 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1486
1487 // inform all registered services of the event
1488 BOOST_FOREACH( NodeListener* i, nodeListeners )
1489 i->onJoinCompleted( spovnetId );
1490
1491 delete replyMsg;
1492
1493 } else {
1494
1495 // this is not the first bootstrap, just join the additional node
1496 logging_debug("not first-time bootstrapping");
1497 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1498 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1499
1500 delete replyMsg;
1501
1502 } // if( overlayInterface == NULL )
1503
1504 return true;
1505}
1506
1507
1508bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1509 // get service
1510 const ServiceID& service = overlayMsg->getService();
1511 logging_debug( "Received data for service " << service.toString()
1512 << " on link " << overlayMsg->getDestinationLink().toString() );
1513
1514 // delegate data message
1515 getListener(service)->onMessage(
1516 overlayMsg,
1517 overlayMsg->getSourceNode(),
1518 overlayMsg->getDestinationLink()
1519 );
1520
1521 return true;
1522}
1523
1524
1525bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1526
1527 if( ld == NULL ) {
1528 logging_warn( "received overlay update message for link for "
1529 << "which we have no mapping" );
1530 return false;
1531 }
1532 logging_info("Received type update message on link " << ld );
1533
1534 // update our link mapping information for this link
1535 bool changed =
1536 ( ld->remoteNode != overlayMsg->getSourceNode() )
1537 || ( ld->service != overlayMsg->getService() );
1538
1539 // set parameters
1540 ld->up = true;
1541 ld->remoteNode = overlayMsg->getSourceNode();
1542 ld->remoteLink = overlayMsg->getSourceLink();
1543 ld->service = overlayMsg->getService();
1544 ld->autolink = overlayMsg->isAutoLink();
1545
1546 // if our link information changed, we send out an update, too
1547 if( changed ) {
1548 overlayMsg->swapRoles();
1549 overlayMsg->setSourceNode(nodeId);
1550 overlayMsg->setSourceLink(ld->overlayId);
1551 overlayMsg->setService(ld->service);
1552 send( overlayMsg, ld );
1553 }
1554
1555 // service registered? no-> error!
1556 if( !communicationListeners.contains( ld->service ) ) {
1557 logging_warn( "Link up: event listener has not been registered" );
1558 return false;
1559 }
1560
1561 // default or no service registered?
1562 CommunicationListener* listener = communicationListeners.get( ld->service );
1563 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1564 logging_warn("Link up: event listener is default or null!" );
1565 return true;
1566 }
1567
1568 // update descriptor
1569 ld->listener = listener;
1570 ld->setAutoUsed();
1571 ld->setAlive();
1572
1573 // ask the service whether it wants to accept this link
1574 if( !listener->onLinkRequest(ld->remoteNode) ) {
1575
1576 logging_debug("Link id=" << ld->overlayId.toString() <<
1577 " has been denied by service " << ld->service.toString() << ", dropping link");
1578
1579 // prevent onLinkDown calls to the service
1580 ld->listener = &CommunicationListener::DEFAULT;
1581
1582 // drop the link
1583 dropLink( ld->overlayId );
1584 return true;
1585 }
1586
1587 // set link up
1588 ld->up = true;
1589 logging_info( "Link has been accepted by service and is up: " << ld );
1590
1591 // auto links: link has been accepted -> send queued messages
1592 if( ld->messageQueue.size() > 0 ) {
1593 logging_info( "Sending out queued messages on link " << ld );
1594 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1595 sendMessage( msg, ld->overlayId );
1596 delete msg;
1597 }
1598 ld->messageQueue.clear();
1599 }
1600
1601 // call the notification functions
1602 listener->onLinkUp( ld->overlayId, ld->remoteNode );
1603 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1604
1605 return true;
1606}
1607
1608/// handle a link request and reply
1609bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1610 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1611
1612 //TODO: Check if a request has already been sent using getSourceLink() ...
1613
1614 // create link descriptor
1615 LinkDescriptor* ldn = addDescriptor();
1616
1617 // flags
1618 ldn->up = true;
1619 ldn->fromRemote = true;
1620 ldn->relayed = true;
1621
1622 // parameters
1623 ldn->service = overlayMsg->getService();
1624 ldn->listener = getListener(ldn->service);
1625 ldn->remoteNode = overlayMsg->getSourceNode();
1626 ldn->remoteLink = overlayMsg->getSourceLink();
1627
1628 // update time-stamps
1629 ldn->setAlive();
1630 ldn->setAutoUsed();
1631
1632 // create reply message and send back!
1633 overlayMsg->swapRoles(); // swap source/destination
1634 overlayMsg->setType(OverlayMsg::typeLinkReply);
1635 overlayMsg->setSourceLink(ldn->overlayId);
1636 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1637 overlayMsg->setRelayed(true);
1638 send( overlayMsg, ld ); // send back to link
1639
1640 // inform listener
1641 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1642
1643 return true;
1644}
1645
1646bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1647
1648 // find link request
1649 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1650
1651 // not found? yes-> drop with error!
1652 if (ldn == NULL) {
1653 logging_error( "No link request pending for "
1654 << overlayMsg->getDestinationLink().toString() );
1655 return false;
1656 }
1657 logging_debug("Handling link reply for " << ldn )
1658
1659 // check if already up
1660 if (ldn->up) {
1661 logging_warn( "Link already up: " << ldn );
1662 return true;
1663 }
1664
1665 // debug message
1666 logging_debug( "Link request reply received. Establishing link"
1667 << " for service " << overlayMsg->getService().toString()
1668 << " with local id=" << overlayMsg->getDestinationLink()
1669 << " and remote link id=" << overlayMsg->getSourceLink()
1670 << " to " << overlayMsg->getSourceEndpoint().toString()
1671 );
1672
1673 // set local link descriptor data
1674 ldn->up = true;
1675 ldn->relayed = true;
1676 ldn->service = overlayMsg->getService();
1677 ldn->listener = getListener(ldn->service);
1678 ldn->remoteLink = overlayMsg->getSourceLink();
1679 ldn->remoteNode = overlayMsg->getSourceNode();
1680
1681 // update timestamps
1682 ldn->setAlive();
1683 ldn->setAutoUsed();
1684
1685 // auto links: link has been accepted -> send queued messages
1686 if( ldn->messageQueue.size() > 0 ) {
1687 logging_info( "Sending out queued messages on link " <<
1688 ldn->overlayId.toString() );
1689 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1690 sendMessage( msg, ldn->overlayId );
1691 delete msg;
1692 }
1693 ldn->messageQueue.clear();
1694 }
1695
1696 // inform listeners about new link
1697 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1698
1699 // try to replace relay link with direct link
1700 ldn->communicationId =
1701 bc->establishLink( overlayMsg->getSourceEndpoint() );
1702
1703 return true;
1704}
1705
1706/// handle a keep-alive message for a link
1707bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1708 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1709 if ( rld != NULL ) {
1710 logging_debug("Keep-Alive for " <<
1711 overlayMsg->getDestinationLink() );
1712 if (overlayMsg->isRouteRecord())
1713 rld->routeRecord = overlayMsg->getRouteRecord();
1714 rld->setAlive();
1715 return true;
1716 } else {
1717 logging_error("Keep-Alive for "
1718 << overlayMsg->getDestinationLink() << ": link unknown." );
1719 return false;
1720 }
1721}
1722
1723/// handle a direct link message
1724bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1725 logging_debug( "Received direct link replacement request" );
1726
1727 /// get destination overlay link
1728 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1729 if (rld == NULL || ld == NULL) {
1730 logging_error("Direct link replacement: Link "
1731 << overlayMsg->getDestinationLink() << "not found error." );
1732 return false;
1733 }
1734 logging_info( "Received direct link convert notification for " << rld );
1735
1736 // update information
1737 rld->communicationId = ld->communicationId;
1738 rld->communicationUp = true;
1739 rld->relayed = false;
1740
1741 // mark used and alive!
1742 rld->setAlive();
1743 rld->setAutoUsed();
1744
1745 // erase the original descriptor
1746 eraseDescriptor(ld->overlayId);
1747}
1748
1749/// handles an incoming message
1750bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1751 const LinkID bcLink ) {
1752 logging_debug( "Handling message: " << message->toString());
1753
1754 // decapsulate overlay message
1755 OverlayMsg* overlayMsg =
1756 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1757 if( overlayMsg == NULL ) return false;
1758
1759 // increase number of hops
1760 overlayMsg->increaseNumHops();
1761
1762 // refresh relay information
1763 refreshRelayInformation( overlayMsg, ld );
1764
1765 // update route record
1766 overlayMsg->addRouteRecord(nodeId);
1767
1768 // handle dht messages (do not route)
1769 if (overlayMsg->isDHTMessage())
1770 return handleDHTMessage(overlayMsg);
1771
1772 // handle signaling messages (do not route!)
1773 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1774 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1775 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1776 delete overlayMsg;
1777 return true;
1778 }
1779
1780 // message for reached destination? no-> route message
1781 if (!overlayMsg->getDestinationNode().isUnspecified() &&
1782 overlayMsg->getDestinationNode() != nodeId ) {
1783 logging_debug("Routing message "
1784 << " from " << overlayMsg->getSourceNode()
1785 << " to " << overlayMsg->getDestinationNode()
1786 );
1787 route( overlayMsg );
1788 delete overlayMsg;
1789 return true;
1790 }
1791
1792 // handle DHT response messages
1793 if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {
1794 bool ret = handleDHTMessage(overlayMsg);
1795 delete overlayMsg;
1796 return ret;
1797 }
1798
1799
1800 // handle base overlay message
1801 bool ret = false; // return value
1802 switch ( overlayMsg->getType() ) {
1803
1804 // data transport messages
1805 case OverlayMsg::typeData:
1806 ret = handleData(overlayMsg, ld); break;
1807
1808 // overlay setup messages
1809 case OverlayMsg::typeJoinRequest:
1810 ret = handleJoinRequest(overlayMsg, bcLink ); break;
1811 case OverlayMsg::typeJoinReply:
1812 ret = handleJoinReply(overlayMsg, bcLink ); break;
1813
1814 // link specific messages
1815 case OverlayMsg::typeLinkRequest:
1816 ret = handleLinkRequest(overlayMsg, ld ); break;
1817 case OverlayMsg::typeLinkReply:
1818 ret = handleLinkReply(overlayMsg, ld ); break;
1819 case OverlayMsg::typeLinkUpdate:
1820 ret = handleLinkUpdate(overlayMsg, ld ); break;
1821 case OverlayMsg::typeLinkAlive:
1822 ret = handleLinkAlive(overlayMsg, ld ); break;
1823 case OverlayMsg::typeLinkDirect:
1824 ret = handleLinkDirect(overlayMsg, ld ); break;
1825
1826 // handle unknown message type
1827 default: {
1828 logging_error( "received message in invalid state! don't know " <<
1829 "what to do with this message of type " << overlayMsg->getType() );
1830 ret = false;
1831 break;
1832 }
1833 }
1834
1835 // free overlay message and return value
1836 delete overlayMsg;
1837 return ret;
1838}
1839
1840// ----------------------------------------------------------------------------
1841
1842void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1843
1844 logging_debug( "broadcasting message to all known nodes " <<
1845 "in the overlay from service " + service.toString() );
1846
1847 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1848 OverlayInterface::NodeList::iterator i = nodes.begin();
1849 for(; i != nodes.end(); i++ ) {
1850 if( *i == nodeId) continue; // don't send to ourselfs
1851 sendMessage( message, *i, service );
1852 }
1853}
1854
1855/// return the overlay neighbors
1856vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1857 // the known nodes _can_ also include our node, so we remove ourself
1858 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1859 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1860 if( i != nodes.end() ) nodes.erase( i );
1861 return nodes;
1862}
1863
1864const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1865 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1866 const LinkDescriptor* ld = getDescriptor(lid);
1867 if( ld == NULL ) return NodeID::UNSPECIFIED;
1868 else return ld->remoteNode;
1869}
1870
1871vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1872 vector<LinkID> linkvector;
1873 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1874 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1875 linkvector.push_back( ld->overlayId );
1876 }
1877 }
1878 return linkvector;
1879}
1880
1881
1882void BaseOverlay::onNodeJoin(const NodeID& node) {
1883 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1884 if( i == joiningNodes.end() ) return;
1885
1886 logging_info( "node has successfully joined baseoverlay and overlay structure "
1887 << node.toString() );
1888
1889 joiningNodes.erase( i );
1890}
1891
1892void BaseOverlay::eventFunction() {
1893 stabilizeRelays();
1894 stabilizeLinks();
1895 stabilizeDHT();
1896}
1897
1898
1899// ----------------------------------------------------------------------------
1900
1901/// stabilize DHT state
1902void BaseOverlay::stabilizeDHT() {
1903
1904}
1905
1906// handle DHT messages
1907bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {
1908
1909 // decapsulate message
1910 logging_debug("received DHT message");
1911 DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();
1912
1913 // handle DHT data message
1914 if (msg->getType()==OverlayMsg::typeDHTData) {
1915 const ServiceID& service = msg->getService();
1916 logging_info( "Received DHT data for service " << service.toString() );
1917
1918 // delegate data message
1919 getListener(service)->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );
1920 return true;
1921 }
1922
1923 // route message to closest node
1924 if (!overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
1925 logging_debug("Routing DHT message to closest node "
1926 << " from " << msg->getSourceNode()
1927 << " to " << msg->getDestinationNode()
1928 );
1929 route( msg );
1930 delete msg;
1931 return true;
1932 }
1933
1934 // now, we are the closest node...
1935 switch (msg->getType()) {
1936 case OverlayMsg::typeDHTPut: {
1937 BOOST_FOREACH( Data value, dhtMsg->getValues() )
1938 dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );
1939 break;
1940 }
1941
1942 case OverlayMsg::typeDHTGet: {
1943 logging_info("DHT-Get: key=" << dhtMsg->getKey() );
1944 vector<Data> vect = dht->get(dhtMsg->getKey());
1945 BOOST_FOREACH(const Data& d, vect)
1946 logging_info("DHT-Get: value=" << d);
1947 OverlayMsg omsg(*msg);
1948 omsg.swapRoles();
1949 omsg.setType(OverlayMsg::typeDHTData);
1950 DHTMessage dhtmsg(dhtMsg->getKey(), vect);
1951 omsg.encapsulate(&dhtmsg);
1952 dhtSend(&omsg, omsg.getDestinationNode());
1953 break;
1954 }
1955
1956 case OverlayMsg::typeDHTRemove: {
1957 if (dhtMsg->hasValues()) {
1958 BOOST_FOREACH( Data value, dhtMsg->getValues() )
1959 dht->remove(dhtMsg->getKey(), value );
1960 } else
1961 dht->remove( dhtMsg->getKey() );
1962 break;
1963 }
1964
1965 default:
1966 logging_error("DHT Message type unknown.");
1967 return false;
1968 }
1969 delete msg;
1970 return true;
1971}
1972
1973/// put a value to the DHT with a ttl given in seconds
1974void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl ) {
1975
1976 logging_info("DHT: putting key=" << key
1977 << " value=" << value
1978 << " ttl=" << ttl
1979 );
1980
1981 // put into local data store (for refreshes)
1982 localDHT->put(key,value,ttl);
1983
1984 // calculate hash
1985 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
1986 DHTMessage dhtmsg(key,value);
1987 dhtmsg.setTTL(ttl);
1988
1989 OverlayMsg msg(OverlayMsg::typeDHTPut);
1990 msg.encapsulate( &dhtmsg );
1991 dhtSend(&msg, dest);
1992}
1993
1994/// removes a key value pair from the DHT
1995void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {
1996 // remove from local data store
1997 localDHT->remove(key,value);
1998
1999 // calculate hash
2000 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
2001 DHTMessage dhtmsg(key,value);
2002
2003 // send message
2004 OverlayMsg msg(OverlayMsg::typeDHTRemove);
2005 msg.encapsulate( &dhtmsg );
2006 dhtSend(&msg, dest);
2007}
2008
2009/// removes all data stored at the given key
2010void BaseOverlay::dhtRemove( const Data& key ) {
2011 // calculate hash
2012 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
2013 DHTMessage dhtmsg(key);
2014
2015 // send message
2016 OverlayMsg msg(OverlayMsg::typeDHTRemove);
2017 msg.encapsulate( &dhtmsg );
2018 dhtSend(&msg, dest);
2019}
2020
2021/// requests data stored using key
2022void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {
2023 logging_info("DHT: trying to resolve key=" <<
2024 key << " for service=" << service.toString() );
2025
2026 // calculate hash
2027 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
2028 DHTMessage dhtmsg(key);
2029
2030 // send message
2031 OverlayMsg msg(OverlayMsg::typeDHTGet);
2032 msg.setService(service);
2033 msg.encapsulate( &dhtmsg );
2034 dhtSend(&msg, dest);
2035}
2036
2037void BaseOverlay::dhtSend( OverlayMsg* msg, const NodeID& dest ) {
2038 logging_info("DHT: sending message with key=" << dest.toString() );
2039 msg->setSourceNode(this->nodeId);
2040 msg->setDestinationNode(dest);
2041
2042 // local storage? yes-> put into DHT directly
2043 if (overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
2044 Data d = data_serialize(msg);
2045 Message* m2 = new Message(d);
2046 OverlayMsg* m3 = m2->decapsulate<OverlayMsg>();
2047 handleDHTMessage(m3);
2048 delete m2;
2049 return;
2050 }
2051
2052 // send message "normally"
2053 send(msg, dest);
2054}
2055
2056
2057}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.