source: source/ariba/overlay/BaseOverlay.cpp@ 6760

Last change on this file since 6760 was 6760, checked in by mies, 14 years ago

added dht test case

File size: 57.9 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/DHTMessage.h"
54#include "ariba/overlay/messages/JoinRequest.h"
55#include "ariba/overlay/messages/JoinReply.h"
56
57#include "ariba/utility/visual/OvlVis.h"
58
59namespace ariba {
60namespace overlay {
61
62class ValueEntry {
63public:
64 ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),
65 last_change(time(NULL)), value(value.clone()) {
66 }
67
68 ~ValueEntry() {
69 value.release();
70 }
71
72 void refresh() {
73 last_update = time(NULL);
74 }
75
76 void set_value( const Data& value ) {
77 this->value.release();
78 this->value = value.clone();
79 this->last_change = time(NULL);
80 this->last_update = time(NULL);
81 }
82
83 Data get_value() const {
84 return value;
85 }
86
87 uint16_t get_ttl() const {
88 return ttl;
89 }
90
91 void set_ttl( uint16_t ttl ) {
92 this->ttl = ttl;
93 }
94
95 bool is_ttl_elapsed() const {
96 // is persistent? yes-> always return false
97 if (ttl==0) return false;
98
99 // return true, if ttl is elapsed
100 return ( difftime( time(NULL), this->last_update ) >= ttl );
101 }
102
103private:
104 uint16_t ttl;
105 time_t last_update;
106 time_t last_change;
107 Data value;
108};
109
110class DHTEntry {
111public:
112 Data key;
113 vector<ValueEntry> values;
114
115 vector<Data> get_values() {
116 vector<Data> vect;
117 BOOST_FOREACH( ValueEntry& e, values )
118 vect.push_back( e.get_value() );
119 return vect;
120 }
121};
122
123class DHT {
124public:
125 typedef vector<DHTEntry> Entries;
126 typedef vector<ValueEntry> Values;
127 Entries entries;
128
129 static bool equals( const Data& lhs, const Data& rhs ) {
130 if (rhs.getLength()!=lhs.getLength()) return false;
131 for (int i=0; i<lhs.getLength()/8; i++)
132 if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;
133 return true;
134 }
135
136 void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {
137
138 // find entry
139 for (size_t i=0; i<entries.size(); i++) {
140 DHTEntry& entry = entries.at(i);
141
142 // check if key is already known
143 if ( equals(entry.key, key) ) {
144
145 // check if value is already in values list
146 for (size_t j=0; j<entry.values.size(); j++) {
147 // found value already? yes-> refresh ttl
148 if ( equals(entry.values[j].get_value(), value) ) {
149 entry.values[j].refresh();
150 std::cout << "DHT: Republished value. Refreshing TTL."
151 << std::endl;
152 return;
153 }
154 }
155
156 // new value-> add to entry
157 std::cout << "DHT: Added value to "
158 << " key=" << key << " with value=" << value << std::endl;
159 entry.values.push_back( ValueEntry( value ) );
160 entry.values.back().set_ttl(ttl);
161 return;
162 }
163 }
164
165 // key is unknown-> add key value pair
166 std::cout << "DHT: New key value pair "
167 << " key=" << key << " with value=" << value << std::endl;
168
169 // add new entry
170 entries.push_back( DHTEntry() );
171 DHTEntry& entry = entries.back();
172 entry.key = key.clone();
173 entry.values.push_back( ValueEntry(value) );
174 entry.values.back().set_ttl(ttl);
175 }
176
177 vector<Data> get( const Data& key ) {
178 // find entry
179 for (size_t i=0; i<entries.size(); i++) {
180 DHTEntry& entry = entries.at(i);
181 if ( equals(entry.key,key) )
182 return entry.get_values();
183 }
184 return vector<Data>();
185 }
186
187 bool remove( const Data& key ) {
188 // find entry
189 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
190 DHTEntry& entry = *i;
191
192 // found? yes-> delete entry
193 if ( equals(entry.key, key) ) {
194 i = entries.erase(i);
195 return true;
196 }
197 }
198 return false;
199 }
200
201 bool remove( const Data& key, const Data& value ) {
202 // find entry
203 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
204 DHTEntry& entry = *i;
205
206 // found? yes-> try to find value
207 if ( equals(entry.key, key) ) {
208 for (Values::iterator j = entry.values.begin();
209 j != entry.values.end(); j++) {
210
211 // value found? yes-> delete
212 if (equals(j->get_value(), value)) {
213 j = entry.values.erase(j);
214 return true;
215 }
216 }
217 }
218 }
219 return false;
220 }
221
222 void cleanup() {
223 // find entry
224 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {
225 DHTEntry& entry = *i;
226
227 for (Values::iterator j = entry.values.begin();
228 j != entry.values.end(); j++) {
229
230 // value found? yes-> delete
231 if (j->is_ttl_elapsed())
232 j = entry.values.erase(j);
233 }
234
235 if (entry.values.size()==0) i = entries.erase(i);
236 }
237 }
238};
239
240// ----------------------------------------------------------------------------
241
242/* *****************************************************************************
243 * PREREQUESITES
244 * ****************************************************************************/
245
246CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
247 if( !communicationListeners.contains( service ) ) {
248 logging_error( "No listener found for service " << service.toString() );
249 return NULL;
250 }
251 CommunicationListener* listener = communicationListeners.get( service );
252 assert( listener != NULL );
253 return listener;
254}
255
256// link descriptor handling ----------------------------------------------------
257
258LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
259 BOOST_FOREACH( LinkDescriptor* lp, links )
260 if ((communication ? lp->communicationId : lp->overlayId) == link)
261 return lp;
262 return NULL;
263}
264
265const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
266 BOOST_FOREACH( const LinkDescriptor* lp, links )
267 if ((communication ? lp->communicationId : lp->overlayId) == link)
268 return lp;
269 return NULL;
270}
271
272/// erases a link descriptor
273void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
274 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
275 LinkDescriptor* ld = *i;
276 if ((communication ? ld->communicationId : ld->overlayId) == link) {
277 delete ld;
278 links.erase(i);
279 break;
280 }
281 }
282}
283
284/// adds a link descriptor
285LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
286 LinkDescriptor* desc = getDescriptor( link );
287 if ( desc == NULL ) {
288 desc = new LinkDescriptor();
289 if (!link.isUnspecified()) desc->overlayId = link;
290 links.push_back(desc);
291 }
292 return desc;
293}
294
295/// returns a auto-link descriptor
296LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
297 // search for a descriptor that is already up
298 BOOST_FOREACH( LinkDescriptor* lp, links )
299 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
300 return lp;
301 // if not found, search for one that is about to come up...
302 BOOST_FOREACH( LinkDescriptor* lp, links )
303 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
304 return lp;
305 return NULL;
306}
307
308/// stabilizes link information
309void BaseOverlay::stabilizeLinks() {
310 // send keep-alive messages over established links
311 BOOST_FOREACH( LinkDescriptor* ld, links ) {
312 if (!ld->up) continue;
313 OverlayMsg msg( OverlayMsg::typeLinkAlive,
314 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
315 if (ld->relayed) msg.setRouteRecord(true);
316 send_link( &msg, ld->overlayId );
317 }
318
319 // iterate over all links and check for time boundaries
320 vector<LinkDescriptor*> oldlinks;
321 time_t now = time(NULL);
322 BOOST_FOREACH( LinkDescriptor* ld, links ) {
323
324 // keep alives and not up? yes-> link connection request is stale!
325 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
326
327 // increase counter
328 ld->keepAliveMissed++;
329
330 // missed more than four keep-alive messages (10 sec)? -> drop link
331 if (ld->keepAliveMissed > 4) {
332 logging_info( "Link connection request is stale, closing: " << ld );
333 oldlinks.push_back( ld );
334 continue;
335 }
336 }
337
338 if (!ld->up) continue;
339
340 // remote used as relay flag
341 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
342 ld->relaying = false;
343
344 // drop links that are dropped and not used as relay
345 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
346 oldlinks.push_back( ld );
347 continue;
348 }
349
350 // auto-link time exceeded?
351 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
352 oldlinks.push_back( ld );
353 continue;
354 }
355
356 // keep alives missed? yes->
357 if ( difftime( now, ld->keepAliveTime ) > 2 ) {
358
359 // increase counter
360 ld->keepAliveMissed++;
361
362 // missed more than four keep-alive messages (4 sec)? -> drop link
363 if (ld->keepAliveMissed >= 4) {
364 logging_info( "Link is stale, closing: " << ld );
365 oldlinks.push_back( ld );
366 continue;
367 }
368 }
369 }
370
371 // drop links
372 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
373 logging_info( "Link timed out. Dropping " << ld );
374 ld->relaying = false;
375 dropLink( ld->overlayId );
376 }
377
378 // show link state
379 counter++;
380 if (counter>=4) showLinks();
381 if (counter>=4 || counter<0) counter = 0;
382}
383
384
385std::string BaseOverlay::getLinkHTMLInfo() {
386 std::ostringstream s;
387 vector<NodeID> nodes;
388 if (links.size()==0) {
389 s << "<h2 style=\"color=#606060\">No links established!</h2>";
390 } else {
391 s << "<h2 style=\"color=#606060\">Links</h2>";
392 s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
393 s << "<tr style=\"background-color=#ffe0e0\">";
394 s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
395 s << "</tr>";
396
397 int i=0;
398 BOOST_FOREACH( LinkDescriptor* ld, links ) {
399 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
400 bool found = false;
401 BOOST_FOREACH(NodeID& id, nodes)
402 if (id == ld->remoteNode) found = true;
403 if (found) continue;
404 i++;
405 nodes.push_back(ld->remoteNode);
406 if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
407 else s << "<tr>";
408 s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
409 s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
410 s << "<td>";
411 if (ld->routeRecord.size()>1 && ld->relayed) {
412 for (size_t i=1; i<ld->routeRecord.size(); i++)
413 s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
414 } else {
415 s << "Direct";
416 }
417 s << "</td>";
418 s << "</tr>";
419 }
420 s << "</table>";
421 }
422 return s.str();
423}
424
425/// shows the current link state
426void BaseOverlay::showLinks() {
427 int i=0;
428 logging_info("--- link state -------------------------------");
429 BOOST_FOREACH( LinkDescriptor* ld, links ) {
430 logging_info("link " << i << ": " << ld);
431 i++;
432 }
433 logging_info("----------------------------------------------");
434}
435
436/// compares two arbitrary links to the same node
437int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
438 LinkDescriptor* lhsld = getDescriptor(lhs);
439 LinkDescriptor* rhsld = getDescriptor(rhs);
440 if (lhsld==NULL || rhsld==NULL
441 || !lhsld->up || !rhsld->up
442 || lhsld->remoteNode != rhsld->remoteNode) return -1;
443
444 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
445 return -1;
446
447 return 1;
448}
449
450
451// internal message delivery ---------------------------------------------------
452
453/// routes a message to its destination node
454void BaseOverlay::route( OverlayMsg* message ) {
455
456 // exceeded time-to-live? yes-> drop message
457 if (message->getNumHops() > message->getTimeToLive()) {
458 logging_warn("Message exceeded TTL. Dropping message and relay routes"
459 "for recovery.");
460 removeRelayNode(message->getDestinationNode());
461 return;
462 }
463
464 // no-> forward message
465 else {
466 // destinastion myself? yes-> handle message
467 if (message->getDestinationNode() == nodeId) {
468 logging_warn("Usually I should not route messages to myself!");
469 Message msg;
470 msg.encapsulate(message);
471 handleMessage( &msg, NULL );
472 } else {
473 // no->send message to next hop
474 send( message, message->getDestinationNode() );
475 }
476 }
477}
478
479/// sends a message to another node, delivers it to the base overlay class
480seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
481 LinkDescriptor* next_link = NULL;
482
483 // drop messages to unspecified destinations
484 if (destination.isUnspecified()) return -1;
485
486 // send messages to myself -> handle message and drop warning!
487 if (destination == nodeId) {
488 logging_warn("Sent message to myself. Handling message.")
489 Message msg;
490 msg.encapsulate(message);
491 handleMessage( &msg, NULL );
492 return -1;
493 }
494
495 // use relay path?
496 if (message->isRelayed()) {
497 next_link = getRelayLinkTo( destination );
498 if (next_link != NULL) {
499 next_link->setRelaying();
500 return bc->sendMessage(next_link->communicationId, message);
501 } else {
502 logging_warn("Could not send message. No relay hop found to "
503 << destination)
504 return -1;
505 }
506 }
507
508 // routed message
509 else {
510 // no-> relay path! route over overlay path
511 LinkID next_id = overlayInterface->getNextLinkId( destination );
512 if (next_id.isUnspecified()) {
513 logging_warn("Could not send message. No next hop found to " <<
514 destination );
515 return -1;
516 }
517
518 // get link descriptor, up and running? yes-> send message
519 next_link = getDescriptor(next_id);
520 if (next_link != NULL && next_link->up) {
521 // send message over relayed link
522 return send(message, next_link);
523 }
524
525 // no-> error, dropping message
526 else {
527 logging_warn("Could not send message. Link not known or up");
528 return -1;
529 }
530 }
531
532 // not reached-> fail
533 return -1;
534}
535
536/// send a message using a link descriptor, delivers it to the base overlay class
537seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
538 // check if null
539 if (ldr == NULL) {
540 logging_error("Can not send message to " << message->getDestinationAddress());
541 return -1;
542 }
543
544 // check if up
545 if (!ldr->up && !ignore_down) {
546 logging_error("Can not send message. Link not up:" << ldr );
547 return -1;
548 }
549 LinkDescriptor* ld = NULL;
550
551 // handle relayed link
552 if (ldr->relayed) {
553 logging_debug("Resolving direct link for relayed link to "
554 << ldr->remoteNode);
555 ld = getRelayLinkTo( ldr->remoteNode );
556 if (ld==NULL) {
557 logging_error("No relay path found to link " << ldr );
558 return -1;
559 }
560 ld->setRelaying();
561 message->setRelayed(true);
562 } else
563 ld = ldr;
564
565 // handle direct link
566 if (ld->communicationUp) {
567 logging_debug("send(): Sending message over direct link.");
568 return bc->sendMessage( ld->communicationId, message );
569 } else {
570 logging_error("send(): Could not send message. "
571 "Not a relayed link and direct link is not up.");
572 return -1;
573 }
574 return -1;
575}
576
577seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
578 const ServiceID& service) {
579 message->setSourceNode(nodeId);
580 message->setDestinationNode(remote);
581 message->setService(service);
582 send( message, remote );
583}
584
585seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
586 LinkDescriptor* ld = getDescriptor(link);
587 if (ld==NULL) {
588 logging_error("Cannot find descriptor to link id=" << link.toString());
589 return -1;
590 }
591 message->setSourceNode(nodeId);
592 message->setDestinationNode(ld->remoteNode);
593
594 message->setSourceLink(ld->overlayId);
595 message->setDestinationLink(ld->remoteLink);
596
597 message->setService(ld->service);
598 message->setRelayed(ld->relayed);
599 return send( message, ld, ignore_down );
600}
601
602// relay route management ------------------------------------------------------
603
604/// stabilize relay information
605void BaseOverlay::stabilizeRelays() {
606 vector<relay_route>::iterator i = relay_routes.begin();
607 while (i!=relay_routes.end() ) {
608 relay_route& route = *i;
609 LinkDescriptor* ld = getDescriptor(route.link);
610
611 // relay link still used and alive?
612 if (ld==NULL
613 || !ld->isDirectVital()
614 || difftime(route.used, time(NULL)) > 8) {
615 logging_info("Forgetting relay information to node "
616 << route.node.toString() );
617 i = relay_routes.erase(i);
618 } else
619 i++;
620 }
621}
622
623void BaseOverlay::removeRelayLink( const LinkID& link ) {
624 vector<relay_route>::iterator i = relay_routes.begin();
625 while (i!=relay_routes.end() ) {
626 relay_route& route = *i;
627 if (route.link == link ) i = relay_routes.erase(i); else i++;
628 }
629}
630
631void BaseOverlay::removeRelayNode( const NodeID& remote ) {
632 vector<relay_route>::iterator i = relay_routes.begin();
633 while (i!=relay_routes.end() ) {
634 relay_route& route = *i;
635 if (route.node == remote ) i = relay_routes.erase(i); else i++;
636 }
637}
638
639/// refreshes relay information
640void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
641
642 // handle relayed messages from real links only
643 if (ld == NULL
644 || ld->relayed
645 || message->getSourceNode()==nodeId ) return;
646
647 // update usage information
648 if (message->isRelayed()) {
649 // try to find source node
650 BOOST_FOREACH( relay_route& route, relay_routes ) {
651 // relay route found? yes->
652 if ( route.node == message->getDestinationNode() ) {
653 ld->setRelaying();
654 route.used = time(NULL);
655 }
656 }
657
658 }
659
660 // register relay path
661 if (message->isRegisterRelay()) {
662 // set relaying
663 ld->setRelaying();
664
665 // try to find source node
666 BOOST_FOREACH( relay_route& route, relay_routes ) {
667
668 // relay route found? yes->
669 if ( route.node == message->getSourceNode() ) {
670
671 // refresh timer
672 route.used = time(NULL);
673 LinkDescriptor* rld = getDescriptor(route.link);
674
675 // route has a shorter hop count or old link is dead? yes-> replace
676 if (route.hops > message->getNumHops()
677 || rld == NULL
678 || !rld->isDirectVital()) {
679 logging_info("Updating relay information to node "
680 << route.node.toString()
681 << " reducing to " << message->getNumHops() << " hops.");
682 route.hops = message->getNumHops();
683 route.link = ld->overlayId;
684 }
685 return;
686 }
687 }
688
689 // not found-> add new entry
690 relay_route route;
691 route.hops = message->getNumHops();
692 route.link = ld->overlayId;
693 route.node = message->getSourceNode();
694 route.used = time(NULL);
695 logging_info("Remembering relay information to node "
696 << route.node.toString());
697 relay_routes.push_back(route);
698 }
699}
700
701/// returns a known "vital" relay link which is up and running
702LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
703 // try to find source node
704 BOOST_FOREACH( relay_route& route, relay_routes ) {
705 if (route.node == remote ) {
706 LinkDescriptor* ld = getDescriptor( route.link );
707 if (ld==NULL || !ld->isDirectVital()) return NULL; else {
708 route.used = time(NULL);
709 return ld;
710 }
711 }
712 }
713 return NULL;
714}
715
716/* *****************************************************************************
717 * PUBLIC MEMBERS
718 * ****************************************************************************/
719
720use_logging_cpp(BaseOverlay);
721
722// ----------------------------------------------------------------------------
723
724BaseOverlay::BaseOverlay() :
725 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
726 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),
727 sideport(&SideportListener::DEFAULT), started(false), counter(0) {
728 dht = new DHT();
729}
730
731BaseOverlay::~BaseOverlay() {
732 delete dht;
733}
734
735// ----------------------------------------------------------------------------
736
737void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
738 logging_info("Starting...");
739
740 // set parameters
741 bc = &_basecomm;
742 nodeId = _nodeid;
743
744 // register at base communication
745 bc->registerMessageReceiver( this );
746 bc->registerEventListener( this );
747
748 // timer for auto link management
749 Timer::setInterval( 1000 );
750 Timer::start();
751
752 started = true;
753 state = BaseOverlayStateInvalid;
754}
755
756void BaseOverlay::stop() {
757 logging_info("Stopping...");
758
759 // stop timer
760 Timer::stop();
761
762 // delete oberlay interface
763 if(overlayInterface != NULL) {
764 delete overlayInterface;
765 overlayInterface = NULL;
766 }
767
768 // unregister at base communication
769 bc->unregisterMessageReceiver( this );
770 bc->unregisterEventListener( this );
771
772 started = false;
773 state = BaseOverlayStateInvalid;
774}
775
776bool BaseOverlay::isStarted(){
777 return started;
778}
779
780// ----------------------------------------------------------------------------
781
782void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
783 const EndpointDescriptor& bootstrapEp) {
784
785 if(id != spovnetId){
786 logging_error("attempt to join against invalid spovnet, call initiate first");
787 return;
788 }
789
790
791 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
792 logging_info( "Starting to join spovnet " << id.toString() <<
793 " with nodeid " << nodeId.toString());
794
795 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
796
797 // bootstrap against ourselfs
798 logging_debug("joining spovnet locally");
799
800 overlayInterface->joinOverlay();
801 state = BaseOverlayStateCompleted;
802 BOOST_FOREACH( NodeListener* i, nodeListeners )
803 i->onJoinCompleted( spovnetId );
804
805 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
806 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
807
808 logging_debug("starting overlay bootstrap module");
809 overlayBootstrap.start(this, spovnetId, nodeId);
810 overlayBootstrap.publish(bc->getEndpointDescriptor());
811
812 } else {
813
814 // bootstrap against another node
815 logging_debug("joining spovnet remotely against " << bootstrapEp.toString());
816
817 const LinkID& lnk = bc->establishLink( bootstrapEp );
818 bootstrapLinks.push_back(lnk);
819 logging_info("join process initiated for " << id.toString() << "...");
820 }
821}
822
823void BaseOverlay::leaveSpoVNet() {
824
825 logging_info( "Leaving spovnet " << spovnetId );
826 bool ret = ( state != this->BaseOverlayStateInvalid );
827
828 logging_debug("stopping overlay bootstrap module");
829 overlayBootstrap.stop();
830 overlayBootstrap.revoke();
831
832 logging_debug( "Dropping all auto-links" );
833
834 // gather all service links
835 vector<LinkID> servicelinks;
836 BOOST_FOREACH( LinkDescriptor* ld, links ) {
837 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
838 servicelinks.push_back( ld->overlayId );
839 }
840
841 // drop all service links
842 BOOST_FOREACH( LinkID lnk, servicelinks )
843 dropLink( lnk );
844
845 // let the node leave the spovnet overlay interface
846 logging_debug( "Leaving overlay" );
847 if( overlayInterface != NULL )
848 overlayInterface->leaveOverlay();
849
850 // drop still open bootstrap links
851 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
852 bc->dropLink( lnk );
853
854 // change to inalid state
855 state = BaseOverlayStateInvalid;
856 //ovl.visShutdown( ovlId, nodeId, string("") );
857
858 // inform all registered services of the event
859 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
860 if( ret ) i->onLeaveCompleted( spovnetId );
861 else i->onLeaveFailed( spovnetId );
862 }
863}
864
865void BaseOverlay::createSpoVNet(const SpoVNetID& id,
866 const OverlayParameterSet& param,
867 const SecurityParameterSet& sec,
868 const QoSParameterSet& qos) {
869
870 // set the state that we are an initiator, this way incoming messages are
871 // handled correctly
872 logging_info( "creating spovnet " + id.toString() <<
873 " with nodeid " << nodeId.toString() );
874
875 spovnetId = id;
876
877 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
878 if( overlayInterface == NULL ) {
879 logging_fatal( "overlay structure not supported" );
880 state = BaseOverlayStateInvalid;
881
882 BOOST_FOREACH( NodeListener* i, nodeListeners )
883 i->onJoinFailed( spovnetId );
884
885 return;
886 }
887}
888
889// ----------------------------------------------------------------------------
890
891const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
892 const NodeID& remoteId, const ServiceID& service ) {
893
894 // establish link via overlay
895 if (!remoteId.isUnspecified())
896 return establishLink( remoteId, service );
897 else
898
899 // establish link directly if only ep is known
900 if (remoteId.isUnspecified())
901 return establishDirectLink(remoteEp, service );
902
903}
904
905/// call base communication's establish link and add link mapping
906const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
907 const ServiceID& service ) {
908
909 /// find a service listener
910 if( !communicationListeners.contains( service ) ) {
911 logging_error( "No listener registered for service id=" << service.toString() );
912 return LinkID::UNSPECIFIED;
913 }
914 CommunicationListener* listener = communicationListeners.get( service );
915 assert( listener != NULL );
916
917 // create descriptor
918 LinkDescriptor* ld = addDescriptor();
919 ld->relayed = false;
920 ld->listener = listener;
921 ld->service = service;
922 ld->communicationId = bc->establishLink( ep );
923
924 /// establish link and add mapping
925 logging_info("Establishing direct link " << ld->communicationId.toString()
926 << " using " << ep.toString());
927
928 return ld->communicationId;
929}
930
931/// establishes a link between two arbitrary nodes
932const LinkID BaseOverlay::establishLink( const NodeID& remote,
933 const ServiceID& service ) {
934
935 // do not establish a link to myself!
936 if (remote == nodeId) return LinkID::UNSPECIFIED;
937
938 // create a link descriptor
939 LinkDescriptor* ld = addDescriptor();
940 ld->relayed = true;
941 ld->remoteNode = remote;
942 ld->service = service;
943 ld->listener = getListener(ld->service);
944
945 // create link request message
946 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
947 msg.setSourceLink(ld->overlayId);
948 msg.setRelayed(true);
949
950 // debug message
951 logging_info(
952 "Sending link request with"
953 << " link=" << ld->overlayId.toString()
954 << " node=" << ld->remoteNode.toString()
955 << " serv=" << ld->service.toString()
956 );
957
958 // sending message to node
959 send_node( &msg, ld->remoteNode, ld->service );
960
961 return ld->overlayId;
962}
963
964/// drops an established link
965void BaseOverlay::dropLink(const LinkID& link) {
966 logging_info( "Dropping link (initiated locally):" << link.toString() );
967
968 // find the link item to drop
969 LinkDescriptor* ld = getDescriptor(link);
970 if( ld == NULL ) {
971 logging_warn( "Can't drop link, link is unknown!");
972 return;
973 }
974
975 // delete all queued messages
976 if( ld->messageQueue.size() > 0 ) {
977 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
978 << ld->messageQueue.size() << " waiting messages" );
979 ld->flushQueue();
980 }
981
982 // inform sideport and listener
983 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
984 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
985
986 // do not drop relay links
987 if (!ld->relaying) {
988 // drop the link in base communication
989 if (ld->communicationUp) bc->dropLink( ld->communicationId );
990
991 // erase descriptor
992 eraseDescriptor( ld->overlayId );
993 } else {
994 ld->dropAfterRelaying = true;
995 }
996}
997
998// ----------------------------------------------------------------------------
999
1000/// internal send message, always use this functions to send messages over links
1001seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
1002 logging_debug( "Sending data message on link " << link.toString() );
1003
1004 // get the mapping for this link
1005 LinkDescriptor* ld = getDescriptor(link);
1006 if( ld == NULL ) {
1007 logging_error("Could not send message. "
1008 << "Link not found id=" << link.toString());
1009 return -1;
1010 }
1011
1012 // check if the link is up yet, if its an auto link queue message
1013 if( !ld->up ) {
1014 ld->setAutoUsed();
1015 if( ld->autolink ) {
1016 logging_info("Auto-link " << link.toString() << " not up, queue message");
1017 Data data = data_serialize( message );
1018 const_cast<Message*>(message)->dropPayload();
1019 ld->messageQueue.push_back( new Message(data) );
1020 } else {
1021 logging_error("Link " << link.toString() << " not up, drop message");
1022 }
1023 return -1;
1024 }
1025
1026 // compile overlay message (has service and node id)
1027 OverlayMsg overmsg( OverlayMsg::typeData );
1028 overmsg.encapsulate( const_cast<Message*>(message) );
1029
1030 // send message over relay/direct/overlay
1031 return send_link( &overmsg, ld->overlayId );
1032}
1033
1034seqnum_t BaseOverlay::sendMessage(const Message* message,
1035 const NodeID& node, const ServiceID& service) {
1036
1037 // find link for node and service
1038 LinkDescriptor* ld = getAutoDescriptor( node, service );
1039
1040 // if we found no link, create an auto link
1041 if( ld == NULL ) {
1042
1043 // debug output
1044 logging_info( "No link to send message to node "
1045 << node.toString() << " found for service "
1046 << service.toString() << ". Creating auto link ..."
1047 );
1048
1049 // call base overlay to create a link
1050 LinkID link = establishLink( node, service );
1051 ld = getDescriptor( link );
1052 if( ld == NULL ) {
1053 logging_error( "Failed to establish auto-link.");
1054 return -1;
1055 }
1056 ld->autolink = true;
1057
1058 logging_debug( "Auto-link establishment in progress to node "
1059 << node.toString() << " with link id=" << link.toString() );
1060 }
1061 assert(ld != NULL);
1062
1063 // mark the link as used, as we now send a message through it
1064 ld->setAutoUsed();
1065
1066 // send / queue message
1067 return sendMessage( message, ld->overlayId );
1068}
1069
1070// ----------------------------------------------------------------------------
1071
1072const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1073 const LinkID& link) const {
1074
1075 // return own end-point descriptor
1076 if( link == LinkID::UNSPECIFIED )
1077 return bc->getEndpointDescriptor();
1078
1079 // find link descriptor. not found -> return unspecified
1080 const LinkDescriptor* ld = getDescriptor(link);
1081 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
1082
1083 // return endpoint-descriptor from base communication
1084 return bc->getEndpointDescriptor( ld->communicationId );
1085}
1086
1087const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1088 const NodeID& node) const {
1089
1090 // return own end-point descriptor
1091 if( node == nodeId || node == NodeID::UNSPECIFIED )
1092 return bc->getEndpointDescriptor();
1093
1094 // no joined and request remote descriptor? -> fail!
1095 if( overlayInterface == NULL ) {
1096 logging_error( "overlay interface not set, cannot resolve endpoint" );
1097 return EndpointDescriptor::UNSPECIFIED();
1098 }
1099
1100 // resolve end-point descriptor from the base-overlay routing table
1101 const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
1102 if(ep != EndpointDescriptor::UNSPECIFIED()) return ep;
1103
1104 // see if we can find the node in our own table
1105 BOOST_FOREACH(const LinkDescriptor* ld, links){
1106 if(ld->remoteNode != node) continue;
1107 const EndpointDescriptor& ep = bc->getEndpointDescriptor(ld->communicationId);
1108 if(ep.toString().size()==0) continue;
1109 if(ep != EndpointDescriptor::UNSPECIFIED()) return ep;
1110 }
1111
1112 return EndpointDescriptor::UNSPECIFIED();
1113}
1114
1115// ----------------------------------------------------------------------------
1116
1117bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1118 sideport = _sideport;
1119 _sideport->configure( this );
1120}
1121
1122bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1123 sideport = &SideportListener::DEFAULT;
1124}
1125
1126// ----------------------------------------------------------------------------
1127
1128bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1129 logging_debug( "binding communication listener " << listener
1130 << " on serviceid " << sid.toString() );
1131
1132 if( communicationListeners.contains( sid ) ) {
1133 logging_error( "some listener already registered for service id "
1134 << sid.toString() );
1135 return false;
1136 }
1137
1138 communicationListeners.registerItem( listener, sid );
1139 return true;
1140}
1141
1142
1143bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1144 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1145
1146 if( !communicationListeners.contains( sid ) ) {
1147 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1148 return false;
1149 }
1150
1151 if( communicationListeners.get(sid) != listener ) {
1152 logging_warn( "listener bound to service id " << sid.toString()
1153 << " is different than listener trying to unbind" );
1154 return false;
1155 }
1156
1157 communicationListeners.unregisterItem( sid );
1158 return true;
1159}
1160
1161// ----------------------------------------------------------------------------
1162
1163bool BaseOverlay::bind(NodeListener* listener) {
1164 logging_debug( "Binding node listener " << listener );
1165
1166 // already bound? yes-> warning
1167 NodeListenerVector::iterator i =
1168 find( nodeListeners.begin(), nodeListeners.end(), listener );
1169 if( i != nodeListeners.end() ) {
1170 logging_warn("Node listener " << listener << " is already bound!" );
1171 return false;
1172 }
1173
1174 // no-> add
1175 nodeListeners.push_back( listener );
1176 return true;
1177}
1178
1179bool BaseOverlay::unbind(NodeListener* listener) {
1180 logging_debug( "Unbinding node listener " << listener );
1181
1182 // already unbound? yes-> warning
1183 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1184 if( i == nodeListeners.end() ) {
1185 logging_warn( "Node listener " << listener << " is not bound!" );
1186 return false;
1187 }
1188
1189 // no-> remove
1190 nodeListeners.erase( i );
1191 return true;
1192}
1193
1194// ----------------------------------------------------------------------------
1195
1196void BaseOverlay::onLinkUp(const LinkID& id,
1197 const address_v* local, const address_v* remote) {
1198 logging_debug( "Link up with base communication link id=" << id );
1199
1200 // get descriptor for link
1201 LinkDescriptor* ld = getDescriptor(id, true);
1202
1203 // handle bootstrap link we initiated
1204 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1205 logging_info(
1206 "Join has been initiated by me and the link is now up. " <<
1207 "Sending out join request for SpoVNet " << spovnetId.toString()
1208 );
1209
1210 // send join request message
1211 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1212 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1213 JoinRequest joinRequest( spovnetId, nodeId );
1214 overlayMsg.encapsulate( &joinRequest );
1215 bc->sendMessage( id, &overlayMsg );
1216 return;
1217 }
1218
1219 // no link found? -> link establishment from remote, add one!
1220 if (ld == NULL) {
1221 ld = addDescriptor( id );
1222 logging_info( "onLinkUp (remote request) descriptor: " << ld );
1223
1224 // update descriptor
1225 ld->fromRemote = true;
1226 ld->communicationId = id;
1227 ld->communicationUp = true;
1228 ld->setAutoUsed();
1229 ld->setAlive();
1230
1231 // in this case, do not inform listener, since service it unknown
1232 // -> wait for update message!
1233
1234 // link mapping found? -> send update message with node-id and service id
1235 } else {
1236 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1237
1238 // update descriptor
1239 ld->setAutoUsed();
1240 ld->setAlive();
1241 ld->communicationUp = true;
1242 ld->fromRemote = false;
1243
1244 // if link is a relayed link->convert to direct link
1245 if (ld->relayed) {
1246 logging_info( "Converting to direct link: " << ld );
1247 ld->up = true;
1248 ld->relayed = false;
1249 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1250 overMsg.setSourceLink( ld->overlayId );
1251 overMsg.setDestinationLink( ld->remoteLink );
1252 send_link( &overMsg, ld->overlayId );
1253 } else {
1254 // note: necessary to validate the link on the remote side!
1255 logging_info( "Sending out update" <<
1256 " for service " << ld->service.toString() <<
1257 " with local node id " << nodeId.toString() <<
1258 " on link " << ld->overlayId.toString() );
1259
1260 // compile and send update message
1261 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1262 overlayMsg.setSourceLink(ld->overlayId);
1263 overlayMsg.setAutoLink( ld->autolink );
1264 send_link( &overlayMsg, ld->overlayId, true );
1265 }
1266 }
1267}
1268
1269void BaseOverlay::onLinkDown(const LinkID& id,
1270 const address_v* local, const address_v* remote) {
1271
1272 // erase bootstrap links
1273 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1274 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1275
1276 // get descriptor for link
1277 LinkDescriptor* ld = getDescriptor(id, true);
1278 if ( ld == NULL ) return; // not found? ->ignore!
1279 logging_info( "onLinkDown descriptor: " << ld );
1280
1281 // removing relay link information
1282 removeRelayLink(ld->overlayId);
1283
1284 // inform listeners about link down
1285 ld->communicationUp = false;
1286 if (!ld->service.isUnspecified()) {
1287 getListener(ld->service)->onLinkDown( ld->overlayId, ld->remoteNode );
1288 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1289 }
1290
1291 // delete all queued messages (auto links)
1292 if( ld->messageQueue.size() > 0 ) {
1293 logging_warn( "Dropping link " << id.toString() << " that has "
1294 << ld->messageQueue.size() << " waiting messages" );
1295 ld->flushQueue();
1296 }
1297
1298 // erase mapping
1299 eraseDescriptor(ld->overlayId);
1300}
1301
1302void BaseOverlay::onLinkChanged(const LinkID& id,
1303 const address_v* oldlocal, const address_v* newlocal,
1304 const address_v* oldremote, const address_v* newremote) {
1305
1306 // get descriptor for link
1307 LinkDescriptor* ld = getDescriptor(id, true);
1308 if ( ld == NULL ) return; // not found? ->ignore!
1309 logging_debug( "onLinkChanged descriptor: " << ld );
1310
1311 // inform listeners
1312 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1313 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1314
1315 // autolinks: refresh timestamp
1316 ld->setAutoUsed();
1317}
1318
1319void BaseOverlay::onLinkFail(const LinkID& id,
1320 const address_v* local, const address_v* remote) {
1321 logging_debug( "Link fail with base communication link id=" << id );
1322
1323 // erase bootstrap links
1324 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1325 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1326
1327 // get descriptor for link
1328 LinkDescriptor* ld = getDescriptor(id, true);
1329 if ( ld == NULL ) return; // not found? ->ignore!
1330 logging_debug( "Link failed id=" << ld->overlayId.toString() );
1331
1332 // inform listeners
1333 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1334 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1335}
1336
1337void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1338 const address_v* remote, const QoSParameterSet& qos) {
1339 logging_debug( "Link quality changed with base communication link id=" << id );
1340
1341 // get descriptor for link
1342 LinkDescriptor* ld = getDescriptor(id, true);
1343 if ( ld == NULL ) return; // not found? ->ignore!
1344 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1345}
1346
1347bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1348 const address_v* remote ) {
1349 logging_debug("Accepting link request from " << remote->to_string() );
1350 return true;
1351}
1352
1353/// handles a message from base communication
1354bool BaseOverlay::receiveMessage(const Message* message,
1355 const LinkID& link, const NodeID& ) {
1356 // get descriptor for link
1357 LinkDescriptor* ld = getDescriptor( link, true );
1358 return handleMessage( message, ld, link );
1359}
1360
1361// ----------------------------------------------------------------------------
1362
1363/// Handle spovnet instance join requests
1364bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1365
1366 // decapsulate message
1367 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1368 logging_info( "Received join request for spovnet " <<
1369 joinReq->getSpoVNetID().toString() );
1370
1371 // check spovnet id
1372 if( joinReq->getSpoVNetID() != spovnetId ) {
1373 logging_error(
1374 "Received join request for spovnet we don't handle " <<
1375 joinReq->getSpoVNetID().toString() );
1376 return false;
1377 }
1378
1379 // TODO: here you can implement mechanisms to deny joining of a node
1380 bool allow = true;
1381 logging_info( "Sending join reply for spovnet " <<
1382 spovnetId.toString() << " to node " <<
1383 overlayMsg->getSourceNode().toString() <<
1384 ". Result: " << (allow ? "allowed" : "denied") );
1385 joiningNodes.push_back( overlayMsg->getSourceNode() );
1386
1387 // return overlay parameters
1388 assert( overlayInterface != NULL );
1389 logging_debug( "Using bootstrap end-point "
1390 << getEndpointDescriptor().toString() )
1391 OverlayParameterSet parameters = overlayInterface->getParameters();
1392 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1393 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1394 JoinReply replyMsg( spovnetId, parameters,
1395 allow, getEndpointDescriptor() );
1396 retmsg.encapsulate(&replyMsg);
1397 bc->sendMessage( bcLink, &retmsg );
1398
1399 return true;
1400}
1401
1402/// Handle replies to spovnet instance join requests
1403bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1404 // decapsulate message
1405 logging_debug("received join reply message");
1406 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1407
1408 // correct spovnet?
1409 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1410 logging_error( "Received SpoVNet join reply for " <<
1411 replyMsg->getSpoVNetID().toString() <<
1412 " != " << spovnetId.toString() );
1413 delete replyMsg;
1414 return false;
1415 }
1416
1417 // access granted? no -> fail
1418 if( !replyMsg->getJoinAllowed() ) {
1419 logging_error( "Our join request has been denied" );
1420
1421 // drop initiator link
1422 if( !bcLink.isUnspecified() ){
1423 bc->dropLink( bcLink );
1424
1425 vector<LinkID>::iterator it = std::find(
1426 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1427 if( it != bootstrapLinks.end() )
1428 bootstrapLinks.erase(it);
1429 }
1430
1431 // inform all registered services of the event
1432 BOOST_FOREACH( NodeListener* i, nodeListeners )
1433 i->onJoinFailed( spovnetId );
1434
1435 delete replyMsg;
1436 return true;
1437 }
1438
1439 // access has been granted -> continue!
1440 logging_info("Join request has been accepted for spovnet " <<
1441 spovnetId.toString() );
1442
1443 logging_debug( "Using bootstrap end-point "
1444 << replyMsg->getBootstrapEndpoint().toString() );
1445
1446 // create overlay structure from spovnet parameter set
1447 // if we have not boostrapped yet against some other node
1448 if( overlayInterface == NULL ){
1449
1450 logging_debug("first-time bootstrapping");
1451
1452 overlayInterface = OverlayFactory::create(
1453 *this, replyMsg->getParam(), nodeId, this );
1454
1455 // overlay structure supported? no-> fail!
1456 if( overlayInterface == NULL ) {
1457 logging_error( "overlay structure not supported" );
1458
1459 if( !bcLink.isUnspecified() ){
1460 bc->dropLink( bcLink );
1461
1462 vector<LinkID>::iterator it = std::find(
1463 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1464 if( it != bootstrapLinks.end() )
1465 bootstrapLinks.erase(it);
1466 }
1467
1468 // inform all registered services of the event
1469 BOOST_FOREACH( NodeListener* i, nodeListeners )
1470 i->onJoinFailed( spovnetId );
1471
1472 delete replyMsg;
1473 return true;
1474 }
1475
1476 // everything ok-> join the overlay!
1477 state = BaseOverlayStateCompleted;
1478 overlayInterface->createOverlay();
1479
1480 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1481 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1482
1483 // update ovlvis
1484 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1485
1486 // inform all registered services of the event
1487 BOOST_FOREACH( NodeListener* i, nodeListeners )
1488 i->onJoinCompleted( spovnetId );
1489
1490 delete replyMsg;
1491
1492 } else {
1493
1494 // this is not the first bootstrap, just join the additional node
1495 logging_debug("not first-time bootstrapping");
1496 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1497 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1498
1499 delete replyMsg;
1500
1501 } // if( overlayInterface == NULL )
1502
1503 return true;
1504}
1505
1506
1507bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1508 // get service
1509 const ServiceID& service = overlayMsg->getService();
1510 logging_debug( "Received data for service " << service.toString()
1511 << " on link " << overlayMsg->getDestinationLink().toString() );
1512
1513 // delegate data message
1514 getListener(service)->onMessage(
1515 overlayMsg,
1516 overlayMsg->getSourceNode(),
1517 overlayMsg->getDestinationLink()
1518 );
1519
1520 return true;
1521}
1522
1523
1524bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1525
1526 if( ld == NULL ) {
1527 logging_warn( "received overlay update message for link for "
1528 << "which we have no mapping" );
1529 return false;
1530 }
1531 logging_info("Received type update message on link " << ld );
1532
1533 // update our link mapping information for this link
1534 bool changed =
1535 ( ld->remoteNode != overlayMsg->getSourceNode() )
1536 || ( ld->service != overlayMsg->getService() );
1537
1538 // set parameters
1539 ld->up = true;
1540 ld->remoteNode = overlayMsg->getSourceNode();
1541 ld->remoteLink = overlayMsg->getSourceLink();
1542 ld->service = overlayMsg->getService();
1543 ld->autolink = overlayMsg->isAutoLink();
1544
1545 // if our link information changed, we send out an update, too
1546 if( changed ) {
1547 overlayMsg->swapRoles();
1548 overlayMsg->setSourceNode(nodeId);
1549 overlayMsg->setSourceLink(ld->overlayId);
1550 overlayMsg->setService(ld->service);
1551 send( overlayMsg, ld );
1552 }
1553
1554 // service registered? no-> error!
1555 if( !communicationListeners.contains( ld->service ) ) {
1556 logging_warn( "Link up: event listener has not been registered" );
1557 return false;
1558 }
1559
1560 // default or no service registered?
1561 CommunicationListener* listener = communicationListeners.get( ld->service );
1562 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1563 logging_warn("Link up: event listener is default or null!" );
1564 return true;
1565 }
1566
1567 // update descriptor
1568 ld->listener = listener;
1569 ld->setAutoUsed();
1570 ld->setAlive();
1571
1572 // ask the service whether it wants to accept this link
1573 if( !listener->onLinkRequest(ld->remoteNode) ) {
1574
1575 logging_debug("Link id=" << ld->overlayId.toString() <<
1576 " has been denied by service " << ld->service.toString() << ", dropping link");
1577
1578 // prevent onLinkDown calls to the service
1579 ld->listener = &CommunicationListener::DEFAULT;
1580
1581 // drop the link
1582 dropLink( ld->overlayId );
1583 return true;
1584 }
1585
1586 // set link up
1587 ld->up = true;
1588 logging_info( "Link has been accepted by service and is up: " << ld );
1589
1590 // auto links: link has been accepted -> send queued messages
1591 if( ld->messageQueue.size() > 0 ) {
1592 logging_info( "Sending out queued messages on link " << ld );
1593 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1594 sendMessage( msg, ld->overlayId );
1595 delete msg;
1596 }
1597 ld->messageQueue.clear();
1598 }
1599
1600 // call the notification functions
1601 listener->onLinkUp( ld->overlayId, ld->remoteNode );
1602 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1603
1604 return true;
1605}
1606
1607/// handle a link request and reply
1608bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1609 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1610
1611 //TODO: Check if a request has already been sent using getSourceLink() ...
1612
1613 // create link descriptor
1614 LinkDescriptor* ldn = addDescriptor();
1615
1616 // flags
1617 ldn->up = true;
1618 ldn->fromRemote = true;
1619 ldn->relayed = true;
1620
1621 // parameters
1622 ldn->service = overlayMsg->getService();
1623 ldn->listener = getListener(ldn->service);
1624 ldn->remoteNode = overlayMsg->getSourceNode();
1625 ldn->remoteLink = overlayMsg->getSourceLink();
1626
1627 // update time-stamps
1628 ldn->setAlive();
1629 ldn->setAutoUsed();
1630
1631 // create reply message and send back!
1632 overlayMsg->swapRoles(); // swap source/destination
1633 overlayMsg->setType(OverlayMsg::typeLinkReply);
1634 overlayMsg->setSourceLink(ldn->overlayId);
1635 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1636 overlayMsg->setRelayed(true);
1637 send( overlayMsg, ld ); // send back to link
1638
1639 // inform listener
1640 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1641
1642 return true;
1643}
1644
1645bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1646
1647 // find link request
1648 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1649
1650 // not found? yes-> drop with error!
1651 if (ldn == NULL) {
1652 logging_error( "No link request pending for "
1653 << overlayMsg->getDestinationLink().toString() );
1654 return false;
1655 }
1656 logging_debug("Handling link reply for " << ldn )
1657
1658 // check if already up
1659 if (ldn->up) {
1660 logging_warn( "Link already up: " << ldn );
1661 return true;
1662 }
1663
1664 // debug message
1665 logging_debug( "Link request reply received. Establishing link"
1666 << " for service " << overlayMsg->getService().toString()
1667 << " with local id=" << overlayMsg->getDestinationLink()
1668 << " and remote link id=" << overlayMsg->getSourceLink()
1669 << " to " << overlayMsg->getSourceEndpoint().toString()
1670 );
1671
1672 // set local link descriptor data
1673 ldn->up = true;
1674 ldn->relayed = true;
1675 ldn->service = overlayMsg->getService();
1676 ldn->listener = getListener(ldn->service);
1677 ldn->remoteLink = overlayMsg->getSourceLink();
1678 ldn->remoteNode = overlayMsg->getSourceNode();
1679
1680 // update timestamps
1681 ldn->setAlive();
1682 ldn->setAutoUsed();
1683
1684 // auto links: link has been accepted -> send queued messages
1685 if( ldn->messageQueue.size() > 0 ) {
1686 logging_info( "Sending out queued messages on link " <<
1687 ldn->overlayId.toString() );
1688 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1689 sendMessage( msg, ldn->overlayId );
1690 delete msg;
1691 }
1692 ldn->messageQueue.clear();
1693 }
1694
1695 // inform listeners about new link
1696 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1697
1698 // try to replace relay link with direct link
1699 ldn->communicationId =
1700 bc->establishLink( overlayMsg->getSourceEndpoint() );
1701
1702 return true;
1703}
1704
1705/// handle a keep-alive message for a link
1706bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1707 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1708 if ( rld != NULL ) {
1709 logging_debug("Keep-Alive for " <<
1710 overlayMsg->getDestinationLink() );
1711 if (overlayMsg->isRouteRecord())
1712 rld->routeRecord = overlayMsg->getRouteRecord();
1713 rld->setAlive();
1714 return true;
1715 } else {
1716 logging_error("Keep-Alive for "
1717 << overlayMsg->getDestinationLink() << ": link unknown." );
1718 return false;
1719 }
1720}
1721
1722/// handle a direct link message
1723bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1724 logging_debug( "Received direct link replacement request" );
1725
1726 /// get destination overlay link
1727 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1728 if (rld == NULL || ld == NULL) {
1729 logging_error("Direct link replacement: Link "
1730 << overlayMsg->getDestinationLink() << "not found error." );
1731 return false;
1732 }
1733 logging_info( "Received direct link convert notification for " << rld );
1734
1735 // update information
1736 rld->communicationId = ld->communicationId;
1737 rld->communicationUp = true;
1738 rld->relayed = false;
1739
1740 // mark used and alive!
1741 rld->setAlive();
1742 rld->setAutoUsed();
1743
1744 // erase the original descriptor
1745 eraseDescriptor(ld->overlayId);
1746}
1747
1748/// handles an incoming message
1749bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1750 const LinkID bcLink ) {
1751 logging_debug( "Handling message: " << message->toString());
1752
1753 // decapsulate overlay message
1754 OverlayMsg* overlayMsg =
1755 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1756 if( overlayMsg == NULL ) return false;
1757
1758 // increase number of hops
1759 overlayMsg->increaseNumHops();
1760
1761 // refresh relay information
1762 refreshRelayInformation( overlayMsg, ld );
1763
1764 // update route record
1765 overlayMsg->addRouteRecord(nodeId);
1766
1767 // handle dht messages (do not route)
1768 if (overlayMsg->isDHTMessage())
1769 return handleDHTMessage(overlayMsg);
1770
1771 // handle signaling messages (do not route!)
1772 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1773 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1774 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1775 delete overlayMsg;
1776 return true;
1777 }
1778
1779 // message for reached destination? no-> route message
1780 if (!overlayMsg->getDestinationNode().isUnspecified() &&
1781 overlayMsg->getDestinationNode() != nodeId ) {
1782 logging_debug("Routing message "
1783 << " from " << overlayMsg->getSourceNode()
1784 << " to " << overlayMsg->getDestinationNode()
1785 );
1786 route( overlayMsg );
1787 delete overlayMsg;
1788 return true;
1789 }
1790
1791 // handle DHT response messages
1792 if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {
1793 bool ret = handleDHTMessage(overlayMsg);
1794 delete overlayMsg;
1795 return ret;
1796 }
1797
1798
1799 // handle base overlay message
1800 bool ret = false; // return value
1801 switch ( overlayMsg->getType() ) {
1802
1803 // data transport messages
1804 case OverlayMsg::typeData:
1805 ret = handleData(overlayMsg, ld); break;
1806
1807 // overlay setup messages
1808 case OverlayMsg::typeJoinRequest:
1809 ret = handleJoinRequest(overlayMsg, bcLink ); break;
1810 case OverlayMsg::typeJoinReply:
1811 ret = handleJoinReply(overlayMsg, bcLink ); break;
1812
1813 // link specific messages
1814 case OverlayMsg::typeLinkRequest:
1815 ret = handleLinkRequest(overlayMsg, ld ); break;
1816 case OverlayMsg::typeLinkReply:
1817 ret = handleLinkReply(overlayMsg, ld ); break;
1818 case OverlayMsg::typeLinkUpdate:
1819 ret = handleLinkUpdate(overlayMsg, ld ); break;
1820 case OverlayMsg::typeLinkAlive:
1821 ret = handleLinkAlive(overlayMsg, ld ); break;
1822 case OverlayMsg::typeLinkDirect:
1823 ret = handleLinkDirect(overlayMsg, ld ); break;
1824
1825 // handle unknown message type
1826 default: {
1827 logging_error( "received message in invalid state! don't know " <<
1828 "what to do with this message of type " << overlayMsg->getType() );
1829 ret = false;
1830 break;
1831 }
1832 }
1833
1834 // free overlay message and return value
1835 delete overlayMsg;
1836 return ret;
1837}
1838
1839// ----------------------------------------------------------------------------
1840
1841void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1842
1843 logging_debug( "broadcasting message to all known nodes " <<
1844 "in the overlay from service " + service.toString() );
1845
1846 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1847 OverlayInterface::NodeList::iterator i = nodes.begin();
1848 for(; i != nodes.end(); i++ ) {
1849 if( *i == nodeId) continue; // don't send to ourselfs
1850 sendMessage( message, *i, service );
1851 }
1852}
1853
1854/// return the overlay neighbors
1855vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1856 // the known nodes _can_ also include our node, so we remove ourself
1857 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1858 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1859 if( i != nodes.end() ) nodes.erase( i );
1860 return nodes;
1861}
1862
1863const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1864 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1865 const LinkDescriptor* ld = getDescriptor(lid);
1866 if( ld == NULL ) return NodeID::UNSPECIFIED;
1867 else return ld->remoteNode;
1868}
1869
1870vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1871 vector<LinkID> linkvector;
1872 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1873 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1874 linkvector.push_back( ld->overlayId );
1875 }
1876 }
1877 return linkvector;
1878}
1879
1880
1881void BaseOverlay::onNodeJoin(const NodeID& node) {
1882 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1883 if( i == joiningNodes.end() ) return;
1884
1885 logging_info( "node has successfully joined baseoverlay and overlay structure "
1886 << node.toString() );
1887
1888 joiningNodes.erase( i );
1889}
1890
1891void BaseOverlay::eventFunction() {
1892 stabilizeRelays();
1893 stabilizeLinks();
1894 stabilizeDHT();
1895}
1896
1897
1898// ----------------------------------------------------------------------------
1899
1900/// stabilize DHT state
1901void BaseOverlay::stabilizeDHT() {
1902
1903}
1904
1905// handle DHT messages
1906bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {
1907
1908 // decapsulate message
1909 logging_debug("received DHT message");
1910 DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();
1911
1912 // handle DHT data message
1913 if (msg->getType()==OverlayMsg::typeDHTData) {
1914 const ServiceID& service = msg->getService();
1915 logging_debug( "Received DHT data for service " << service.toString() );
1916
1917 // delegate data message
1918 getListener(service)->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );
1919 return true;
1920 }
1921
1922 // route message to closest node
1923 if (!overlayInterface->isClosestNodeTo(msg->getDestinationNode())) {
1924 logging_debug("Routing DHT message to closest node "
1925 << " from " << msg->getSourceNode()
1926 << " to " << msg->getDestinationNode()
1927 );
1928 route( msg );
1929 delete msg;
1930 return true;
1931 }
1932
1933 // now, we are the closest node...
1934 switch (msg->getType()) {
1935 case OverlayMsg::typeDHTPut: {
1936 BOOST_FOREACH( Data value, dhtMsg->getValues() )
1937 dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );
1938 break;
1939 }
1940
1941 case OverlayMsg::typeDHTGet: {
1942 vector<Data> vect = dht->get(dhtMsg->getKey());
1943 OverlayMsg omsg(*msg);
1944 omsg.swapRoles();
1945 omsg.setType(OverlayMsg::typeDHTData);
1946 DHTMessage dhtmsg(dhtMsg->getKey(), vect);
1947 omsg.encapsulate(&dhtmsg);
1948 this->send(&omsg, omsg.getDestinationNode());
1949 break;
1950 }
1951
1952 case OverlayMsg::typeDHTRemove: {
1953 if (dhtMsg->hasValues()) {
1954 BOOST_FOREACH( Data value, dhtMsg->getValues() )
1955 dht->remove(dhtMsg->getKey(), value );
1956 } else
1957 dht->remove( dhtMsg->getKey() );
1958 break;
1959 }
1960
1961 default:
1962 logging_error("DHT Message type unknown.");
1963 return false;
1964 }
1965 delete msg;
1966 return true;
1967}
1968
1969/// put a value to the DHT with a ttl given in seconds
1970void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl ) {
1971 // calculate hash
1972 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
1973 OverlayMsg msg(OverlayMsg::typeDHTPut);
1974 DHTMessage dhtmsg(key,value);
1975 dhtmsg.setTTL(ttl);
1976 msg.setDestinationNode(dest);
1977 msg.encapsulate( &dhtmsg );
1978 send(&msg, dest);
1979}
1980
1981/// removes a key value pair from the DHT
1982void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {
1983 // calculate hash
1984 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
1985 OverlayMsg msg(OverlayMsg::typeDHTRemove);
1986 DHTMessage dhtmsg(key,value);
1987 msg.setDestinationNode(dest);
1988 msg.encapsulate( &dhtmsg );
1989 send(&msg, dest);
1990}
1991
1992/// removes all data stored at the given key
1993void BaseOverlay::dhtRemove( const Data& key ) {
1994 // calculate hash
1995 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
1996 OverlayMsg msg(OverlayMsg::typeDHTRemove);
1997 DHTMessage dhtmsg(key);
1998 msg.setDestinationNode(dest);
1999 msg.encapsulate( &dhtmsg );
2000 send(&msg, dest);
2001}
2002
2003/// requests data stored using key
2004void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {
2005 // calculate hash
2006 NodeID dest = NodeID::sha1(key.getBuffer(), key.getLength() / 8);
2007 OverlayMsg msg(OverlayMsg::typeDHTGet);
2008 DHTMessage dhtmsg(key);
2009 msg.setDestinationNode(dest);
2010 msg.setService(service);
2011 msg.encapsulate( &dhtmsg );
2012 send(&msg, dest);
2013}
2014
2015}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.