close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/overlay/BaseOverlay.cpp@ 10752

Last change on this file since 10752 was 10653, checked in by Michael Tänzer, 12 years ago

Merge the ASIO branch back into trunk

File size: 57.3 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/visual/OvlVis.h"
57#include "ariba/utility/visual/DddVis.h"
58#include "ariba/utility/visual/ServerVis.h"
59
60namespace ariba {
61namespace overlay {
62
63#define visualInstance ariba::utility::DddVis::instance()
64#define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
65#define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
66
67
68// ----------------------------------------------------------------------------
69
70/* *****************************************************************************
71 * PREREQUESITES
72 * ****************************************************************************/
73
74CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
75 if( !communicationListeners.contains( service ) ) {
76 logging_info( "No listener found for service " << service.toString() );
77 return NULL;
78 }
79 CommunicationListener* listener = communicationListeners.get( service );
80 assert( listener != NULL );
81 return listener;
82}
83
84// link descriptor handling ----------------------------------------------------
85
86LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
87 BOOST_FOREACH( LinkDescriptor* lp, links )
88 if ((communication ? lp->communicationId : lp->overlayId) == link)
89 return lp;
90 return NULL;
91}
92
93const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
94 BOOST_FOREACH( const LinkDescriptor* lp, links )
95 if ((communication ? lp->communicationId : lp->overlayId) == link)
96 return lp;
97 return NULL;
98}
99
100/// erases a link descriptor
101void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
102 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
103 LinkDescriptor* ld = *i;
104 if ((communication ? ld->communicationId : ld->overlayId) == link) {
105 delete ld;
106 links.erase(i);
107 break;
108 }
109 }
110}
111
112/// adds a link descriptor
113LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
114 LinkDescriptor* desc = getDescriptor( link );
115 if ( desc == NULL ) {
116 desc = new LinkDescriptor();
117 if (!link.isUnspecified()) desc->overlayId = link;
118 links.push_back(desc);
119 }
120 return desc;
121}
122
123/// returns a auto-link descriptor
124LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) {
125 // search for a descriptor that is already up
126 BOOST_FOREACH( LinkDescriptor* lp, links )
127 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)
128 return lp;
129 // if not found, search for one that is about to come up...
130 BOOST_FOREACH( LinkDescriptor* lp, links )
131 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 )
132 return lp;
133 return NULL;
134}
135
136/// stabilizes link information
137void BaseOverlay::stabilizeLinks() {
138 // send keep-alive messages over established links
139 BOOST_FOREACH( LinkDescriptor* ld, links ) {
140 if (!ld->up) continue;
141 OverlayMsg msg( OverlayMsg::typeLinkAlive,
142 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
143 if (ld->relayed) msg.setRouteRecord(true);
144 send_link( &msg, ld->overlayId );
145 }
146
147 // iterate over all links and check for time boundaries
148 vector<LinkDescriptor*> oldlinks;
149 time_t now = time(NULL);
150 BOOST_FOREACH( LinkDescriptor* ld, links ) {
151
152 // keep alives and not up? yes-> link connection request is stale!
153 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {
154
155 // increase counter
156 ld->keepAliveMissed++;
157
158 // missed more than four keep-alive messages (10 sec)? -> drop link
159 if (ld->keepAliveMissed > 4) {
160 logging_info( "Link connection request is stale, closing: " << ld );
161 oldlinks.push_back( ld );
162 continue;
163 }
164 }
165
166 if (!ld->up) continue;
167
168 // check if link is relayed and retry connecting directly
169 if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) {
170 ld->retryCounter--;
171 ld->communicationId = bc->establishLink( ld->endpoint );
172 }
173
174 // remote used as relay flag
175 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)
176 ld->relaying = false;
177
178 // drop links that are dropped and not used as relay
179 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
180 oldlinks.push_back( ld );
181 continue;
182 }
183
184 // auto-link time exceeded?
185 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {
186 oldlinks.push_back( ld );
187 continue;
188 }
189
190 // keep alives missed? yes->
191 if ( difftime( now, ld->keepAliveTime ) > 4 ) {
192
193 // increase counter
194 ld->keepAliveMissed++;
195
196 // missed more than four keep-alive messages (4 sec)? -> drop link
197 if (ld->keepAliveMissed >= 2) {
198 logging_info( "Link is stale, closing: " << ld );
199 oldlinks.push_back( ld );
200 continue;
201 }
202 }
203 }
204
205 // drop links
206 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {
207 logging_info( "Link timed out. Dropping " << ld );
208 ld->relaying = false;
209 dropLink( ld->overlayId );
210 }
211
212 // show link state
213 counter++;
214 if (counter>=4) showLinks();
215 if (counter>=4 || counter<0) counter = 0;
216}
217
218
219std::string BaseOverlay::getLinkHTMLInfo() {
220 std::ostringstream s;
221 vector<NodeID> nodes;
222 if (links.size()==0) {
223 s << "<h2 style=\"color=#606060\">No links established!</h2>";
224 } else {
225 s << "<h2 style=\"color=#606060\">Links</h2>";
226 s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
227 s << "<tr style=\"background-color=#ffe0e0\">";
228 s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
229 s << "</tr>";
230
231 int i=0;
232 BOOST_FOREACH( LinkDescriptor* ld, links ) {
233 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
234 bool found = false;
235 BOOST_FOREACH(NodeID& id, nodes)
236 if (id == ld->remoteNode) found = true;
237 if (found) continue;
238 i++;
239 nodes.push_back(ld->remoteNode);
240 if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
241 else s << "<tr>";
242 s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
243 s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
244 s << "<td>";
245 if (ld->routeRecord.size()>1 && ld->relayed) {
246 for (size_t i=1; i<ld->routeRecord.size(); i++)
247 s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
248 } else {
249 s << "Direct";
250 }
251 s << "</td>";
252 s << "</tr>";
253 }
254 s << "</table>";
255 }
256 return s.str();
257}
258
259/// shows the current link state
260void BaseOverlay::showLinks() {
261 int i=0;
262 logging_info("--- link state -------------------------------");
263 BOOST_FOREACH( LinkDescriptor* ld, links ) {
264 string epd = "";
265 if (ld->isDirectVital())
266 epd = getEndpointDescriptor(ld->remoteNode).toString();
267
268 logging_info("LINK_STATE: " << i << ": " << ld << " " << epd);
269 i++;
270 }
271 logging_info("----------------------------------------------");
272}
273
274/// compares two arbitrary links to the same node
275int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
276 LinkDescriptor* lhsld = getDescriptor(lhs);
277 LinkDescriptor* rhsld = getDescriptor(rhs);
278 if (lhsld==NULL || rhsld==NULL
279 || !lhsld->up || !rhsld->up
280 || lhsld->remoteNode != rhsld->remoteNode) return -1;
281
282 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
283 return -1;
284
285 return 1;
286}
287
288
289// internal message delivery ---------------------------------------------------
290
291/// routes a message to its destination node
292void BaseOverlay::route( OverlayMsg* message ) {
293
294 // exceeded time-to-live? yes-> drop message
295 if (message->getNumHops() > message->getTimeToLive()) {
296 logging_warn("Message exceeded TTL. Dropping message and relay routes"
297 "for recovery.");
298 removeRelayNode(message->getDestinationNode());
299 return;
300 }
301
302 // no-> forward message
303 else {
304 // destinastion myself? yes-> handle message
305 if (message->getDestinationNode() == nodeId) {
306 logging_warn("Usually I should not route messages to myself!");
307 Message msg;
308 msg.encapsulate(message);
309 handleMessage( &msg, NULL );
310 } else {
311 // no->send message to next hop
312 send( message, message->getDestinationNode() );
313 }
314 }
315}
316
317/// sends a message to another node, delivers it to the base overlay class
318seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) {
319 LinkDescriptor* next_link = NULL;
320
321 // drop messages to unspecified destinations
322 if (destination.isUnspecified()) return -1;
323
324 // send messages to myself -> handle message and drop warning!
325 if (destination == nodeId) {
326 logging_warn("Sent message to myself. Handling message.")
327 Message msg;
328 msg.encapsulate(message);
329 handleMessage( &msg, NULL );
330 return -1;
331 }
332
333 // use relay path?
334 if (message->isRelayed()) {
335 next_link = getRelayLinkTo( destination );
336 if (next_link != NULL) {
337 next_link->setRelaying();
338 return bc->sendMessage(next_link->communicationId, message);
339 } else {
340 logging_warn("Could not send message. No relay hop found to "
341 << destination << " -- trying to route over overlay paths ...")
342// logging_error("ERROR: " << debugInformation() );
343 // return -1;
344 }
345 }
346
347 // last resort -> route over overlay path
348 LinkID next_id = overlayInterface->getNextLinkId( destination );
349 if (next_id.isUnspecified()) {
350 logging_warn("Could not send message. No next hop found to " <<
351 destination );
352 logging_error("ERROR: " << debugInformation() );
353 return -1;
354 }
355
356 // get link descriptor, up and running? yes-> send message
357 next_link = getDescriptor(next_id);
358 if (next_link != NULL && next_link->up) {
359 // send message over relayed link
360 return send(message, next_link);
361 }
362
363 // no-> error, dropping message
364 else {
365 logging_warn("Could not send message. Link not known or up");
366 logging_error("ERROR: " << debugInformation() );
367 return -1;
368 }
369
370 // not reached-> fail
371 return -1;
372}
373
374/// send a message using a link descriptor, delivers it to the base overlay class
375seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) {
376 // check if null
377 if (ldr == NULL) {
378 logging_error("Can not send message to " << message->getDestinationAddress());
379 return -1;
380 }
381
382 // check if up
383 if (!ldr->up && !ignore_down) {
384 logging_error("Can not send message. Link not up:" << ldr );
385 logging_error("DEBUG_INFO: " << debugInformation() );
386 return -1;
387 }
388 LinkDescriptor* ld = NULL;
389
390 // handle relayed link
391 if (ldr->relayed) {
392 logging_debug("Resolving direct link for relayed link to "
393 << ldr->remoteNode);
394 ld = getRelayLinkTo( ldr->remoteNode );
395 if (ld==NULL) {
396 logging_error("No relay path found to link " << ldr );
397 logging_error("DEBUG_INFO: " << debugInformation() );
398 return -1;
399 }
400 ld->setRelaying();
401 message->setRelayed(true);
402 } else
403 ld = ldr;
404
405 // handle direct link
406 if (ld->communicationUp) {
407 logging_debug("send(): Sending message over direct link.");
408 return bc->sendMessage( ld->communicationId, message );
409 } else {
410 logging_error("send(): Could not send message. "
411 "Not a relayed link and direct link is not up.");
412 return -1;
413 }
414 return -1;
415}
416
417seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
418 const ServiceID& service) {
419 message->setSourceNode(nodeId);
420 message->setDestinationNode(remote);
421 message->setService(service);
422 return send( message, remote );
423}
424
425seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) {
426 LinkDescriptor* ld = getDescriptor(link);
427 if (ld==NULL) {
428 logging_error("Cannot find descriptor to link id=" << link.toString());
429 return -1;
430 }
431 message->setSourceNode(nodeId);
432 message->setDestinationNode(ld->remoteNode);
433
434 message->setSourceLink(ld->overlayId);
435 message->setDestinationLink(ld->remoteLink);
436
437 message->setService(ld->service);
438 message->setRelayed(ld->relayed);
439 return send( message, ld, ignore_down );
440}
441
442// relay route management ------------------------------------------------------
443
444/// stabilize relay information
445void BaseOverlay::stabilizeRelays() {
446 vector<relay_route>::iterator i = relay_routes.begin();
447 while (i!=relay_routes.end() ) {
448 relay_route& route = *i;
449 LinkDescriptor* ld = getDescriptor(route.link);
450
451 // relay link still used and alive?
452 if (ld==NULL
453 || !ld->isDirectVital()
454 || difftime(route.used, time(NULL)) > 8) {
455 logging_info("Forgetting relay information to node "
456 << route.node.toString() );
457 i = relay_routes.erase(i);
458 } else
459 i++;
460 }
461}
462
463void BaseOverlay::removeRelayLink( const LinkID& link ) {
464 vector<relay_route>::iterator i = relay_routes.begin();
465 while (i!=relay_routes.end() ) {
466 relay_route& route = *i;
467 if (route.link == link ) i = relay_routes.erase(i); else i++;
468 }
469}
470
471void BaseOverlay::removeRelayNode( const NodeID& remote ) {
472 vector<relay_route>::iterator i = relay_routes.begin();
473 while (i!=relay_routes.end() ) {
474 relay_route& route = *i;
475 if (route.node == remote ) i = relay_routes.erase(i); else i++;
476 }
477}
478
479/// refreshes relay information
480void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
481
482 // handle relayed messages from real links only
483 if (ld == NULL
484 || ld->relayed
485 || message->getSourceNode()==nodeId ) return;
486
487 // update usage information
488 if (message->isRelayed()) {
489 // try to find source node
490 BOOST_FOREACH( relay_route& route, relay_routes ) {
491 // relay route found? yes->
492 if ( route.node == message->getDestinationNode() ) {
493 ld->setRelaying();
494 route.used = time(NULL);
495 }
496 }
497
498 }
499
500 // register relay path
501 if (message->isRegisterRelay()) {
502 // set relaying
503 ld->setRelaying();
504
505 // try to find source node
506 BOOST_FOREACH( relay_route& route, relay_routes ) {
507
508 // relay route found? yes->
509 if ( route.node == message->getSourceNode() ) {
510
511 // refresh timer
512 route.used = time(NULL);
513 LinkDescriptor* rld = getDescriptor(route.link);
514
515 // route has a shorter hop count or old link is dead? yes-> replace
516 if (route.hops > message->getNumHops()
517 || rld == NULL
518 || !rld->isDirectVital()) {
519 logging_info("Updating relay information to node "
520 << route.node.toString()
521 << " reducing to " << message->getNumHops() << " hops.");
522 route.hops = message->getNumHops();
523 route.link = ld->overlayId;
524 }
525 return;
526 }
527 }
528
529 // not found-> add new entry
530 relay_route route;
531 route.hops = message->getNumHops();
532 route.link = ld->overlayId;
533 route.node = message->getSourceNode();
534 route.used = time(NULL);
535 logging_info("Remembering relay information to node "
536 << route.node.toString());
537 relay_routes.push_back(route);
538 }
539}
540
541/// returns a known "vital" relay link which is up and running
542LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
543 // try to find source node
544 BOOST_FOREACH( relay_route& route, relay_routes ) {
545 if (route.node == remote ) {
546 LinkDescriptor* ld = getDescriptor( route.link );
547 if (ld==NULL || !ld->isDirectVital()) return NULL; else {
548 route.used = time(NULL);
549 return ld;
550 }
551 }
552 }
553 return NULL;
554}
555
556/* *****************************************************************************
557 * PUBLIC MEMBERS
558 * ****************************************************************************/
559
560use_logging_cpp(BaseOverlay);
561
562// ----------------------------------------------------------------------------
563
564BaseOverlay::BaseOverlay() :
565 started(false),state(BaseOverlayStateInvalid),
566 bc(NULL),
567 nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED),
568 sideport(&SideportListener::DEFAULT), overlayInterface(NULL),
569 counter(0) {
570}
571
572BaseOverlay::~BaseOverlay() {
573}
574
575// ----------------------------------------------------------------------------
576
577void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) {
578 logging_info("Starting...");
579
580 // set parameters
581 bc = &_basecomm;
582 nodeId = _nodeid;
583
584 // register at base communication
585 bc->registerMessageReceiver( this );
586 bc->registerEventListener( this );
587
588 // timer for auto link management
589 Timer::setInterval( 1000 );
590 Timer::start();
591
592 started = true;
593 state = BaseOverlayStateInvalid;
594}
595
596void BaseOverlay::stop() {
597 logging_info("Stopping...");
598
599 // stop timer
600 Timer::stop();
601
602 // delete oberlay interface
603 if(overlayInterface != NULL) {
604 delete overlayInterface;
605 overlayInterface = NULL;
606 }
607
608 // unregister at base communication
609 bc->unregisterMessageReceiver( this );
610 bc->unregisterEventListener( this );
611
612 started = false;
613 state = BaseOverlayStateInvalid;
614}
615
616bool BaseOverlay::isStarted(){
617 return started;
618}
619
620// ----------------------------------------------------------------------------
621
622void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
623 const EndpointDescriptor& bootstrapEp) {
624
625 if(id != spovnetId){
626 logging_error("attempt to join against invalid spovnet, call initiate first");
627 return;
628 }
629
630 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
631 logging_info( "Starting to join spovnet " << id.toString() <<
632 " with nodeid " << nodeId.toString());
633
634 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
635
636 //** FIRST STEP - MANDATORY */
637
638 // bootstrap against ourselfs
639 logging_info("joining spovnet locally");
640
641 overlayInterface->joinOverlay();
642 state = BaseOverlayStateCompleted;
643 BOOST_FOREACH( NodeListener* i, nodeListeners )
644 i->onJoinCompleted( spovnetId );
645
646 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
647 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
648
649 } else {
650
651 //** SECOND STEP - OPTIONAL */
652
653 // bootstrap against another node
654 logging_info("joining spovnet remotely against " << bootstrapEp.toString());
655
656 const LinkID& lnk = bc->establishLink( bootstrapEp );
657 bootstrapLinks.push_back(lnk);
658 logging_info("join process initiated for " << id.toString() << "...");
659 }
660}
661
662
663void BaseOverlay::startBootstrapModules(vector<pair<BootstrapManager::BootstrapType,string> > modules){
664 logging_debug("starting overlay bootstrap module");
665 overlayBootstrap.start(this, spovnetId, nodeId, modules);
666 overlayBootstrap.publish(bc->getEndpointDescriptor());
667}
668
669void BaseOverlay::stopBootstrapModules(){
670 logging_debug("stopping overlay bootstrap module");
671 overlayBootstrap.stop();
672 overlayBootstrap.revoke();
673}
674
675void BaseOverlay::leaveSpoVNet() {
676
677 logging_info( "Leaving spovnet " << spovnetId );
678 bool ret = ( state != this->BaseOverlayStateInvalid );
679
680 logging_debug( "Dropping all auto-links" );
681
682 // gather all service links
683 vector<LinkID> servicelinks;
684 BOOST_FOREACH( LinkDescriptor* ld, links ) {
685 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
686 servicelinks.push_back( ld->overlayId );
687 }
688
689 // drop all service links
690 BOOST_FOREACH( LinkID lnk, servicelinks )
691 dropLink( lnk );
692
693 // let the node leave the spovnet overlay interface
694 logging_debug( "Leaving overlay" );
695 if( overlayInterface != NULL )
696 overlayInterface->leaveOverlay();
697
698 // drop still open bootstrap links
699 BOOST_FOREACH( LinkID lnk, bootstrapLinks )
700 bc->dropLink( lnk );
701
702 // change to inalid state
703 state = BaseOverlayStateInvalid;
704 //ovl.visShutdown( ovlId, nodeId, string("") );
705
706 visualInstance.visShutdown(visualIdOverlay, nodeId, "");
707 visualInstance.visShutdown(visualIdBase, nodeId, "");
708
709 // inform all registered services of the event
710 BOOST_FOREACH( NodeListener* i, nodeListeners ) {
711 if( ret ) i->onLeaveCompleted( spovnetId );
712 else i->onLeaveFailed( spovnetId );
713 }
714}
715
716void BaseOverlay::createSpoVNet(const SpoVNetID& id,
717 const OverlayParameterSet& param,
718 const SecurityParameterSet& sec,
719 const QoSParameterSet& qos) {
720
721 // set the state that we are an initiator, this way incoming messages are
722 // handled correctly
723 logging_info( "creating spovnet " + id.toString() <<
724 " with nodeid " << nodeId.toString() );
725
726 spovnetId = id;
727
728 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
729 if( overlayInterface == NULL ) {
730 logging_fatal( "overlay structure not supported" );
731 state = BaseOverlayStateInvalid;
732
733 BOOST_FOREACH( NodeListener* i, nodeListeners )
734 i->onJoinFailed( spovnetId );
735
736 return;
737 }
738
739 visualInstance.visCreate(visualIdBase, nodeId, "", "");
740 visualInstance.visCreate(visualIdOverlay, nodeId, "", "");
741}
742
743// ----------------------------------------------------------------------------
744
745const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
746 const NodeID& remoteId, const ServiceID& service ) {
747
748 // establish link via overlay
749 if (!remoteId.isUnspecified())
750 return establishLink( remoteId, service );
751 else
752 return establishDirectLink(remoteEp, service );
753}
754
755/// call base communication's establish link and add link mapping
756const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
757 const ServiceID& service ) {
758
759 /// find a service listener
760 if( !communicationListeners.contains( service ) ) {
761 logging_error( "No listener registered for service id=" << service.toString() );
762 return LinkID::UNSPECIFIED;
763 }
764 CommunicationListener* listener = communicationListeners.get( service );
765 assert( listener != NULL );
766
767 // create descriptor
768 LinkDescriptor* ld = addDescriptor();
769 ld->relayed = false;
770 ld->listener = listener;
771 ld->service = service;
772 ld->communicationId = bc->establishLink( ep );
773
774 /// establish link and add mapping
775 logging_info("Establishing direct link " << ld->communicationId.toString()
776 << " using " << ep.toString());
777
778 return ld->communicationId;
779}
780
781/// establishes a link between two arbitrary nodes
782const LinkID BaseOverlay::establishLink( const NodeID& remote,
783 const ServiceID& service ) {
784
785 // do not establish a link to myself!
786 if (remote == nodeId) return LinkID::UNSPECIFIED;
787
788 // create a link descriptor
789 LinkDescriptor* ld = addDescriptor();
790 ld->relayed = true;
791 ld->remoteNode = remote;
792 ld->service = service;
793 ld->listener = getListener(ld->service);
794
795 // create link request message
796 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
797 msg.setSourceLink(ld->overlayId);
798
799 // send over relayed link
800 msg.setRelayed(true);
801 msg.setRegisterRelay(true);
802
803 // debug message
804 logging_info(
805 "Sending link request with"
806 << " link=" << ld->overlayId.toString()
807 << " node=" << ld->remoteNode.toString()
808 << " serv=" << ld->service.toString()
809 );
810
811 // sending message to node
812 send_node( &msg, ld->remoteNode, ld->service );
813
814 return ld->overlayId;
815}
816
817/// drops an established link
818void BaseOverlay::dropLink(const LinkID& link) {
819 logging_info( "Dropping link (initiated locally):" << link.toString() );
820
821 // find the link item to drop
822 LinkDescriptor* ld = getDescriptor(link);
823 if( ld == NULL ) {
824 logging_warn( "Can't drop link, link is unknown!");
825 return;
826 }
827
828 // delete all queued messages
829 if( ld->messageQueue.size() > 0 ) {
830 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
831 << ld->messageQueue.size() << " waiting messages" );
832 ld->flushQueue();
833 }
834
835 // inform sideport and listener
836 if(ld->listener != NULL)
837 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
838 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
839
840 // do not drop relay links
841 if (!ld->relaying) {
842 // drop the link in base communication
843 if (ld->communicationUp) bc->dropLink( ld->communicationId );
844
845 // erase descriptor
846 eraseDescriptor( ld->overlayId );
847 } else {
848 ld->dropAfterRelaying = true;
849 }
850}
851
852// ----------------------------------------------------------------------------
853
854/// internal send message, always use this functions to send messages over links
855seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) {
856 logging_debug( "Sending data message on link " << link.toString() );
857
858 // get the mapping for this link
859 LinkDescriptor* ld = getDescriptor(link);
860 if( ld == NULL ) {
861 logging_error("Could not send message. "
862 << "Link not found id=" << link.toString());
863 return -1;
864 }
865
866 // check if the link is up yet, if its an auto link queue message
867 if( !ld->up ) {
868 ld->setAutoUsed();
869 if( ld->autolink ) {
870 logging_info("Auto-link " << link.toString() << " not up, queue message");
871 Data data = data_serialize( message );
872 const_cast<Message*>(message)->dropPayload();
873 ld->messageQueue.push_back( new Message(data) );
874 } else {
875 logging_error("Link " << link.toString() << " not up, drop message");
876 }
877 return -1;
878 }
879
880 // compile overlay message (has service and node id)
881 OverlayMsg overmsg( OverlayMsg::typeData );
882 overmsg.encapsulate( const_cast<Message*>(message) );
883
884 // send message over relay/direct/overlay
885 return send_link( &overmsg, ld->overlayId );
886}
887
888
889seqnum_t BaseOverlay::sendMessage(const Message* message,
890 const NodeID& node, const ServiceID& service) {
891
892 // find link for node and service
893 LinkDescriptor* ld = getAutoDescriptor( node, service );
894
895 // if we found no link, create an auto link
896 if( ld == NULL ) {
897
898 // debug output
899 logging_info( "No link to send message to node "
900 << node.toString() << " found for service "
901 << service.toString() << ". Creating auto link ..."
902 );
903
904 // call base overlay to create a link
905 LinkID link = establishLink( node, service );
906 ld = getDescriptor( link );
907 if( ld == NULL ) {
908 logging_error( "Failed to establish auto-link.");
909 return -1;
910 }
911 ld->autolink = true;
912
913 logging_debug( "Auto-link establishment in progress to node "
914 << node.toString() << " with link id=" << link.toString() );
915 }
916 assert(ld != NULL);
917
918 // mark the link as used, as we now send a message through it
919 ld->setAutoUsed();
920
921 // send / queue message
922 return sendMessage( message, ld->overlayId );
923}
924
925
926NodeID BaseOverlay::sendMessageCloserToNodeID(const Message* message,
927 const NodeID& address, const ServiceID& service) {
928
929 if ( overlayInterface->isClosestNodeTo(address) )
930 {
931 return NodeID::UNSPECIFIED;
932 }
933
934 const NodeID& closest_node = overlayInterface->getNextNodeId(address);
935
936 if ( closest_node != NodeID::UNSPECIFIED )
937 {
938 seqnum_t seqnum = sendMessage(message, closest_node, service);
939 }
940
941 return closest_node; // XXX return seqnum ?? tuple? closest_node via (non const) reference?
942}
943// ----------------------------------------------------------------------------
944
945const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
946 const LinkID& link) const {
947
948 // return own end-point descriptor
949 if( link.isUnspecified() )
950 return bc->getEndpointDescriptor();
951
952 // find link descriptor. not found -> return unspecified
953 const LinkDescriptor* ld = getDescriptor(link);
954 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
955
956 // return endpoint-descriptor from base communication
957 return bc->getEndpointDescriptor( ld->communicationId );
958}
959
960const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
961 const NodeID& node) const {
962
963 // return own end-point descriptor
964 if( node == nodeId || node.isUnspecified() ) {
965 //logging_info("getEndpointDescriptor: returning self.");
966 return bc->getEndpointDescriptor();
967 }
968
969 // no joined and request remote descriptor? -> fail!
970 if( overlayInterface == NULL ) {
971 logging_error( "Overlay interface not set, cannot resolve end-point." );
972 return EndpointDescriptor::UNSPECIFIED();
973 }
974
975// // resolve end-point descriptor from the base-overlay routing table
976// const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
977// if(ep.toString() != "") return ep;
978
979 // see if we can find the node in our own table
980 BOOST_FOREACH(const LinkDescriptor* ld, links){
981 if(ld->remoteNode != node) continue;
982 if(!ld->communicationUp) continue;
983 const EndpointDescriptor& ep =
984 bc->getEndpointDescriptor(ld->communicationId);
985 if(ep != EndpointDescriptor::UNSPECIFIED()) {
986 //logging_info("getEndpointDescriptor: using " << ld->to_string());
987 return ep;
988 }
989 }
990
991 logging_warn( "No EndpointDescriptor found for node " << node );
992 logging_warn( const_cast<BaseOverlay*>(this)->debugInformation() );
993
994 return EndpointDescriptor::UNSPECIFIED();
995}
996
997// ----------------------------------------------------------------------------
998
999bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1000 sideport = _sideport;
1001 _sideport->configure( this );
1002 return true;
1003}
1004
1005bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1006 sideport = &SideportListener::DEFAULT;
1007 return true;
1008}
1009
1010// ----------------------------------------------------------------------------
1011
1012bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1013 logging_debug( "binding communication listener " << listener
1014 << " on serviceid " << sid.toString() );
1015
1016 if( communicationListeners.contains( sid ) ) {
1017 logging_error( "some listener already registered for service id "
1018 << sid.toString() );
1019 return false;
1020 }
1021
1022 communicationListeners.registerItem( listener, sid );
1023 return true;
1024}
1025
1026
1027bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1028 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1029
1030 if( !communicationListeners.contains( sid ) ) {
1031 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1032 return false;
1033 }
1034
1035 if( communicationListeners.get(sid) != listener ) {
1036 logging_warn( "listener bound to service id " << sid.toString()
1037 << " is different than listener trying to unbind" );
1038 return false;
1039 }
1040
1041 communicationListeners.unregisterItem( sid );
1042 return true;
1043}
1044
1045// ----------------------------------------------------------------------------
1046
1047bool BaseOverlay::bind(NodeListener* listener) {
1048 logging_debug( "Binding node listener " << listener );
1049
1050 // already bound? yes-> warning
1051 NodeListenerVector::iterator i =
1052 find( nodeListeners.begin(), nodeListeners.end(), listener );
1053 if( i != nodeListeners.end() ) {
1054 logging_warn("Node listener " << listener << " is already bound!" );
1055 return false;
1056 }
1057
1058 // no-> add
1059 nodeListeners.push_back( listener );
1060 return true;
1061}
1062
1063bool BaseOverlay::unbind(NodeListener* listener) {
1064 logging_debug( "Unbinding node listener " << listener );
1065
1066 // already unbound? yes-> warning
1067 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1068 if( i == nodeListeners.end() ) {
1069 logging_warn( "Node listener " << listener << " is not bound!" );
1070 return false;
1071 }
1072
1073 // no-> remove
1074 nodeListeners.erase( i );
1075 return true;
1076}
1077
1078// ----------------------------------------------------------------------------
1079
1080void BaseOverlay::onLinkUp(const LinkID& id,
1081 const address_v* local, const address_v* remote) {
1082 logging_debug( "Link up with base communication link id=" << id );
1083
1084 // get descriptor for link
1085 LinkDescriptor* ld = getDescriptor(id, true);
1086
1087 // handle bootstrap link we initiated
1088 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1089 logging_info(
1090 "Join has been initiated by me and the link is now up. " <<
1091 "Sending out join request for SpoVNet " << spovnetId.toString()
1092 );
1093
1094 // send join request message
1095 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1096 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1097 JoinRequest joinRequest( spovnetId, nodeId );
1098 overlayMsg.encapsulate( &joinRequest );
1099 bc->sendMessage( id, &overlayMsg );
1100 return;
1101 }
1102
1103 // no link found? -> link establishment from remote, add one!
1104 if (ld == NULL) {
1105 ld = addDescriptor( id );
1106 logging_info( "onLinkUp (remote request) descriptor: " << ld );
1107
1108 // update descriptor
1109 ld->fromRemote = true;
1110 ld->communicationId = id;
1111 ld->communicationUp = true;
1112 ld->setAutoUsed();
1113 ld->setAlive();
1114
1115 // in this case, do not inform listener, since service it unknown
1116 // -> wait for update message!
1117
1118 // link mapping found? -> send update message with node-id and service id
1119 } else {
1120 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1121
1122 // update descriptor
1123 ld->setAutoUsed();
1124 ld->setAlive();
1125 ld->communicationUp = true;
1126 ld->fromRemote = false;
1127
1128 // if link is a relayed link->convert to direct link
1129 if (ld->relayed) {
1130 logging_info( "Converting to direct link: " << ld );
1131 ld->up = true;
1132 ld->relayed = false;
1133 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1134 overMsg.setSourceLink( ld->overlayId );
1135 overMsg.setDestinationLink( ld->remoteLink );
1136 send_link( &overMsg, ld->overlayId );
1137 } else {
1138 // note: necessary to validate the link on the remote side!
1139 logging_info( "Sending out update" <<
1140 " for service " << ld->service.toString() <<
1141 " with local node id " << nodeId.toString() <<
1142 " on link " << ld->overlayId.toString() );
1143
1144 // compile and send update message
1145 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1146 overlayMsg.setSourceLink(ld->overlayId);
1147 overlayMsg.setAutoLink( ld->autolink );
1148 send_link( &overlayMsg, ld->overlayId, true );
1149 }
1150 }
1151}
1152
1153void BaseOverlay::onLinkDown(const LinkID& id,
1154 const address_v* local, const address_v* remote) {
1155
1156 // erase bootstrap links
1157 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1158 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1159
1160 // get descriptor for link
1161 LinkDescriptor* ld = getDescriptor(id, true);
1162 if ( ld == NULL ) return; // not found? ->ignore!
1163 logging_info( "onLinkDown descriptor: " << ld );
1164
1165 // removing relay link information
1166 removeRelayLink(ld->overlayId);
1167
1168 // inform listeners about link down
1169 ld->communicationUp = false;
1170 if (!ld->service.isUnspecified()) {
1171 CommunicationListener* lst = getListener(ld->service);
1172 if(lst != NULL) lst->onLinkDown( ld->overlayId, ld->remoteNode );
1173 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1174 }
1175
1176 // delete all queued messages (auto links)
1177 if( ld->messageQueue.size() > 0 ) {
1178 logging_warn( "Dropping link " << id.toString() << " that has "
1179 << ld->messageQueue.size() << " waiting messages" );
1180 ld->flushQueue();
1181 }
1182
1183 // erase mapping
1184 eraseDescriptor(ld->overlayId);
1185}
1186
1187void BaseOverlay::onLinkChanged(const LinkID& id,
1188 const address_v* oldlocal, const address_v* newlocal,
1189 const address_v* oldremote, const address_v* newremote) {
1190
1191 // get descriptor for link
1192 LinkDescriptor* ld = getDescriptor(id, true);
1193 if ( ld == NULL ) return; // not found? ->ignore!
1194 logging_debug( "onLinkChanged descriptor: " << ld );
1195
1196 // inform listeners
1197 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1198 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1199
1200 // autolinks: refresh timestamp
1201 ld->setAutoUsed();
1202}
1203
1204void BaseOverlay::onLinkFail(const LinkID& id,
1205 const address_v* local, const address_v* remote) {
1206 logging_debug( "Link fail with base communication link id=" << id );
1207
1208 // erase bootstrap links
1209 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1210 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1211
1212 // get descriptor for link
1213 LinkDescriptor* ld = getDescriptor(id, true);
1214 if ( ld == NULL ) return; // not found? ->ignore!
1215 logging_debug( "Link failed id=" << ld->overlayId.toString() );
1216
1217 // inform listeners
1218 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1219 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1220}
1221
1222void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local,
1223 const address_v* remote, const QoSParameterSet& qos) {
1224 logging_debug( "Link quality changed with base communication link id=" << id );
1225
1226 // get descriptor for link
1227 LinkDescriptor* ld = getDescriptor(id, true);
1228 if ( ld == NULL ) return; // not found? ->ignore!
1229 logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1230}
1231
1232bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local,
1233 const address_v* remote ) {
1234 logging_debug("Accepting link request from " << remote->to_string() );
1235 return true;
1236}
1237
1238/// handles a message from base communication
1239bool BaseOverlay::receiveMessage(const Message* message,
1240 const LinkID& link, const NodeID& ) {
1241 // get descriptor for link
1242 LinkDescriptor* ld = getDescriptor( link, true );
1243 return handleMessage( message, ld, link );
1244}
1245
1246// ----------------------------------------------------------------------------
1247
1248/// Handle spovnet instance join requests
1249bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1250
1251 // decapsulate message
1252 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
1253 logging_info( "Received join request for spovnet " <<
1254 joinReq->getSpoVNetID().toString() );
1255
1256 // check spovnet id
1257 if( joinReq->getSpoVNetID() != spovnetId ) {
1258 logging_error(
1259 "Received join request for spovnet we don't handle " <<
1260 joinReq->getSpoVNetID().toString() );
1261 delete joinReq;
1262 return false;
1263 }
1264
1265 // TODO: here you can implement mechanisms to deny joining of a node
1266 bool allow = true;
1267 logging_info( "Sending join reply for spovnet " <<
1268 spovnetId.toString() << " to node " <<
1269 overlayMsg->getSourceNode().toString() <<
1270 ". Result: " << (allow ? "allowed" : "denied") );
1271 joiningNodes.push_back( overlayMsg->getSourceNode() );
1272
1273 // return overlay parameters
1274 assert( overlayInterface != NULL );
1275 logging_debug( "Using bootstrap end-point "
1276 << getEndpointDescriptor().toString() )
1277 OverlayParameterSet parameters = overlayInterface->getParameters();
1278 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1279 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1280 JoinReply replyMsg( spovnetId, parameters,
1281 allow, getEndpointDescriptor() );
1282 retmsg.encapsulate(&replyMsg);
1283 bc->sendMessage( bcLink, &retmsg );
1284
1285 delete joinReq;
1286 return true;
1287}
1288
1289/// Handle replies to spovnet instance join requests
1290bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) {
1291 // decapsulate message
1292 logging_debug("received join reply message");
1293 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
1294
1295 // correct spovnet?
1296 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail
1297 logging_error( "Received SpoVNet join reply for " <<
1298 replyMsg->getSpoVNetID().toString() <<
1299 " != " << spovnetId.toString() );
1300 delete replyMsg;
1301 return false;
1302 }
1303
1304 // access granted? no -> fail
1305 if( !replyMsg->getJoinAllowed() ) {
1306 logging_error( "Our join request has been denied" );
1307
1308 // drop initiator link
1309 if( !bcLink.isUnspecified() ){
1310 bc->dropLink( bcLink );
1311
1312 vector<LinkID>::iterator it = std::find(
1313 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1314 if( it != bootstrapLinks.end() )
1315 bootstrapLinks.erase(it);
1316 }
1317
1318 // inform all registered services of the event
1319 BOOST_FOREACH( NodeListener* i, nodeListeners )
1320 i->onJoinFailed( spovnetId );
1321
1322 delete replyMsg;
1323 return true;
1324 }
1325
1326 // access has been granted -> continue!
1327 logging_info("Join request has been accepted for spovnet " <<
1328 spovnetId.toString() );
1329
1330 logging_debug( "Using bootstrap end-point "
1331 << replyMsg->getBootstrapEndpoint().toString() );
1332
1333 // create overlay structure from spovnet parameter set
1334 // if we have not boostrapped yet against some other node
1335 if( overlayInterface == NULL ){
1336
1337 logging_debug("first-time bootstrapping");
1338
1339 overlayInterface = OverlayFactory::create(
1340 *this, replyMsg->getParam(), nodeId, this );
1341
1342 // overlay structure supported? no-> fail!
1343 if( overlayInterface == NULL ) {
1344 logging_error( "overlay structure not supported" );
1345
1346 if( !bcLink.isUnspecified() ){
1347 bc->dropLink( bcLink );
1348
1349 vector<LinkID>::iterator it = std::find(
1350 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1351 if( it != bootstrapLinks.end() )
1352 bootstrapLinks.erase(it);
1353 }
1354
1355 // inform all registered services of the event
1356 BOOST_FOREACH( NodeListener* i, nodeListeners )
1357 i->onJoinFailed( spovnetId );
1358
1359 delete replyMsg;
1360 return true;
1361 }
1362
1363 // everything ok-> join the overlay!
1364 state = BaseOverlayStateCompleted;
1365 overlayInterface->createOverlay();
1366
1367 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1368 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1369
1370 // update ovlvis
1371 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1372
1373 // inform all registered services of the event
1374 BOOST_FOREACH( NodeListener* i, nodeListeners )
1375 i->onJoinCompleted( spovnetId );
1376
1377 delete replyMsg;
1378
1379 } else {
1380
1381 // this is not the first bootstrap, just join the additional node
1382 logging_debug("not first-time bootstrapping");
1383 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
1384 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );
1385
1386 delete replyMsg;
1387
1388 } // if( overlayInterface == NULL )
1389
1390 return true;
1391}
1392
1393
1394bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1395 // get service
1396 const ServiceID& service = overlayMsg->getService();
1397 logging_debug( "Received data for service " << service.toString()
1398 << " on link " << overlayMsg->getDestinationLink().toString() );
1399
1400 // delegate data message
1401 CommunicationListener* lst = getListener(service);
1402 if(lst != NULL){
1403 lst->onMessage(
1404 overlayMsg,
1405 overlayMsg->getSourceNode(),
1406 overlayMsg->getDestinationLink()
1407 );
1408 }
1409
1410 return true;
1411}
1412
1413
1414bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1415
1416 if( ld == NULL ) {
1417 logging_warn( "received overlay update message for link for "
1418 << "which we have no mapping" );
1419 return false;
1420 }
1421 logging_info("Received type update message on link " << ld );
1422
1423 // update our link mapping information for this link
1424 bool changed =
1425 ( ld->remoteNode != overlayMsg->getSourceNode() )
1426 || ( ld->service != overlayMsg->getService() );
1427
1428 // set parameters
1429 ld->up = true;
1430 ld->remoteNode = overlayMsg->getSourceNode();
1431 ld->remoteLink = overlayMsg->getSourceLink();
1432 ld->service = overlayMsg->getService();
1433 ld->autolink = overlayMsg->isAutoLink();
1434
1435 // if our link information changed, we send out an update, too
1436 if( changed ) {
1437 overlayMsg->swapRoles();
1438 overlayMsg->setSourceNode(nodeId);
1439 overlayMsg->setSourceLink(ld->overlayId);
1440 overlayMsg->setService(ld->service);
1441 send( overlayMsg, ld );
1442 }
1443
1444 // service registered? no-> error!
1445 if( !communicationListeners.contains( ld->service ) ) {
1446 logging_warn( "Link up: event listener has not been registered" );
1447 return false;
1448 }
1449
1450 // default or no service registered?
1451 CommunicationListener* listener = communicationListeners.get( ld->service );
1452 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
1453 logging_warn("Link up: event listener is default or null!" );
1454 return true;
1455 }
1456
1457 // update descriptor
1458 ld->listener = listener;
1459 ld->setAutoUsed();
1460 ld->setAlive();
1461
1462 // ask the service whether it wants to accept this link
1463 if( !listener->onLinkRequest(ld->remoteNode) ) {
1464
1465 logging_debug("Link id=" << ld->overlayId.toString() <<
1466 " has been denied by service " << ld->service.toString() << ", dropping link");
1467
1468 // prevent onLinkDown calls to the service
1469 ld->listener = &CommunicationListener::DEFAULT;
1470
1471 // drop the link
1472 dropLink( ld->overlayId );
1473 return true;
1474 }
1475
1476 // set link up
1477 ld->up = true;
1478 logging_info( "Link has been accepted by service and is up: " << ld );
1479
1480 // auto links: link has been accepted -> send queued messages
1481 if( ld->messageQueue.size() > 0 ) {
1482 logging_info( "Sending out queued messages on link " << ld );
1483 BOOST_FOREACH( Message* msg, ld->messageQueue ) {
1484 sendMessage( msg, ld->overlayId );
1485 delete msg;
1486 }
1487 ld->messageQueue.clear();
1488 }
1489
1490 // call the notification functions
1491 listener->onLinkUp( ld->overlayId, ld->remoteNode );
1492 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
1493
1494 return true;
1495}
1496
1497/// handle a link request and reply
1498bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1499 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );
1500
1501 //TODO: Check if a request has already been sent using getSourceLink() ...
1502
1503 // create link descriptor
1504 LinkDescriptor* ldn = addDescriptor();
1505
1506 // flags
1507 ldn->up = true;
1508 ldn->fromRemote = true;
1509 ldn->relayed = true;
1510
1511 // parameters
1512 ldn->service = overlayMsg->getService();
1513 ldn->listener = getListener(ldn->service);
1514 ldn->remoteNode = overlayMsg->getSourceNode();
1515 ldn->remoteLink = overlayMsg->getSourceLink();
1516
1517 // update time-stamps
1518 ldn->setAlive();
1519 ldn->setAutoUsed();
1520
1521 // create reply message and send back!
1522 overlayMsg->swapRoles(); // swap source/destination
1523 overlayMsg->setType(OverlayMsg::typeLinkReply);
1524 overlayMsg->setSourceLink(ldn->overlayId);
1525 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );
1526 overlayMsg->setRelayed(true);
1527 send( overlayMsg, ld ); // send back to link
1528
1529 // inform listener
1530 if(ldn != NULL && ldn->listener != NULL)
1531 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1532
1533 return true;
1534}
1535
1536bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1537
1538 // find link request
1539 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
1540
1541 // not found? yes-> drop with error!
1542 if (ldn == NULL) {
1543 logging_error( "No link request pending for "
1544 << overlayMsg->getDestinationLink().toString() );
1545 return false;
1546 }
1547 logging_debug("Handling link reply for " << ldn )
1548
1549 // check if already up
1550 if (ldn->up) {
1551 logging_warn( "Link already up: " << ldn );
1552 return true;
1553 }
1554
1555 // debug message
1556 logging_debug( "Link request reply received. Establishing link"
1557 << " for service " << overlayMsg->getService().toString()
1558 << " with local id=" << overlayMsg->getDestinationLink()
1559 << " and remote link id=" << overlayMsg->getSourceLink()
1560 << " to " << overlayMsg->getSourceEndpoint().toString()
1561 );
1562
1563 // set local link descriptor data
1564 ldn->up = true;
1565 ldn->relayed = true;
1566 ldn->service = overlayMsg->getService();
1567 ldn->listener = getListener(ldn->service);
1568 ldn->remoteLink = overlayMsg->getSourceLink();
1569 ldn->remoteNode = overlayMsg->getSourceNode();
1570
1571 // update timestamps
1572 ldn->setAlive();
1573 ldn->setAutoUsed();
1574
1575 // auto links: link has been accepted -> send queued messages
1576 if( ldn->messageQueue.size() > 0 ) {
1577 logging_info( "Sending out queued messages on link " <<
1578 ldn->overlayId.toString() );
1579 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {
1580 sendMessage( msg, ldn->overlayId );
1581 delete msg;
1582 }
1583 ldn->messageQueue.clear();
1584 }
1585
1586 // inform listeners about new link
1587 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
1588
1589 // try to replace relay link with direct link
1590 ldn->retryCounter = 3;
1591 ldn->endpoint = overlayMsg->getSourceEndpoint();
1592 ldn->communicationId = bc->establishLink( ldn->endpoint );
1593
1594 return true;
1595}
1596
1597/// handle a keep-alive message for a link
1598bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1599 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
1600 if ( rld != NULL ) {
1601 logging_debug("Keep-Alive for " <<
1602 overlayMsg->getDestinationLink() );
1603 if (overlayMsg->isRouteRecord())
1604 rld->routeRecord = overlayMsg->getRouteRecord();
1605 rld->setAlive();
1606 return true;
1607 } else {
1608 logging_error("Keep-Alive for "
1609 << overlayMsg->getDestinationLink() << ": link unknown." );
1610 return false;
1611 }
1612}
1613
1614/// handle a direct link message
1615bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
1616 logging_debug( "Received direct link replacement request" );
1617
1618 /// get destination overlay link
1619 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
1620 if (rld == NULL || ld == NULL) {
1621 logging_error("Direct link replacement: Link "
1622 << overlayMsg->getDestinationLink() << "not found error." );
1623 return false;
1624 }
1625 logging_info( "Received direct link convert notification for " << rld );
1626
1627 // update information
1628 rld->communicationId = ld->communicationId;
1629 rld->communicationUp = true;
1630 rld->relayed = false;
1631
1632 // mark used and alive!
1633 rld->setAlive();
1634 rld->setAutoUsed();
1635
1636 // erase the original descriptor
1637 eraseDescriptor(ld->overlayId);
1638 return true;
1639}
1640
1641/// handles an incoming message
1642bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld,
1643 const LinkID bcLink ) {
1644 logging_debug( "Handling message: " << message->toString());
1645
1646 // decapsulate overlay message
1647 OverlayMsg* overlayMsg =
1648 const_cast<Message*>(message)->decapsulate<OverlayMsg>();
1649 if( overlayMsg == NULL ) return false;
1650
1651 // increase number of hops
1652 overlayMsg->increaseNumHops();
1653
1654 // refresh relay information
1655 refreshRelayInformation( overlayMsg, ld );
1656
1657 // update route record
1658 overlayMsg->addRouteRecord(nodeId);
1659
1660 // handle signaling messages (do not route!)
1661 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
1662 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {
1663 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
1664 delete overlayMsg;
1665 return true;
1666 }
1667
1668 // message for reached destination? no-> route message
1669 if (!overlayMsg->getDestinationNode().isUnspecified() &&
1670 overlayMsg->getDestinationNode() != nodeId ) {
1671 logging_debug("Routing message "
1672 << " from " << overlayMsg->getSourceNode()
1673 << " to " << overlayMsg->getDestinationNode()
1674 );
1675 route( overlayMsg );
1676 delete overlayMsg;
1677 return true;
1678 }
1679
1680 // handle base overlay message
1681 bool ret = false; // return value
1682 switch ( overlayMsg->getType() ) {
1683
1684 // data transport messages
1685 case OverlayMsg::typeData:
1686 ret = handleData(overlayMsg, ld); break;
1687
1688 // overlay setup messages
1689 case OverlayMsg::typeJoinRequest:
1690 ret = handleJoinRequest(overlayMsg, bcLink ); break;
1691 case OverlayMsg::typeJoinReply:
1692 ret = handleJoinReply(overlayMsg, bcLink ); break;
1693
1694 // link specific messages
1695 case OverlayMsg::typeLinkRequest:
1696 ret = handleLinkRequest(overlayMsg, ld ); break;
1697 case OverlayMsg::typeLinkReply:
1698 ret = handleLinkReply(overlayMsg, ld ); break;
1699 case OverlayMsg::typeLinkUpdate:
1700 ret = handleLinkUpdate(overlayMsg, ld ); break;
1701 case OverlayMsg::typeLinkAlive:
1702 ret = handleLinkAlive(overlayMsg, ld ); break;
1703 case OverlayMsg::typeLinkDirect:
1704 ret = handleLinkDirect(overlayMsg, ld ); break;
1705
1706 // handle unknown message type
1707 default: {
1708 logging_error( "received message in invalid state! don't know " <<
1709 "what to do with this message of type " << overlayMsg->getType() );
1710 ret = false;
1711 break;
1712 }
1713 }
1714
1715 // free overlay message and return value
1716 delete overlayMsg;
1717 return ret;
1718}
1719
1720// ----------------------------------------------------------------------------
1721
1722void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) {
1723
1724 logging_debug( "broadcasting message to all known nodes " <<
1725 "in the overlay from service " + service.toString() );
1726
1727 if(message == NULL) return;
1728 message->setReleasePayload(false);
1729
1730 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
1731 for(size_t i=0; i<nodes.size(); i++){
1732 NodeID& id = nodes.at(i);
1733 if(id == this->nodeId) continue; // don't send to ourselfs
1734 if(i+1 == nodes.size()) message->setReleasePayload(true); // release payload on last send
1735 sendMessage( message, id, service );
1736 }
1737}
1738
1739/// return the overlay neighbors
1740vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
1741 // the known nodes _can_ also include our node, so we remove ourself
1742 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
1743 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1744 if( i != nodes.end() ) nodes.erase( i );
1745 return nodes;
1746}
1747
1748const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1749 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1750 const LinkDescriptor* ld = getDescriptor(lid);
1751 if( ld == NULL ) return NodeID::UNSPECIFIED;
1752 else return ld->remoteNode;
1753}
1754
1755vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1756 vector<LinkID> linkvector;
1757 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1758 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
1759 linkvector.push_back( ld->overlayId );
1760 }
1761 }
1762 return linkvector;
1763}
1764
1765
1766void BaseOverlay::onNodeJoin(const NodeID& node) {
1767 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1768 if( i == joiningNodes.end() ) return;
1769
1770 logging_info( "node has successfully joined baseoverlay and overlay structure "
1771 << node.toString() );
1772
1773 joiningNodes.erase( i );
1774}
1775
1776void BaseOverlay::eventFunction() {
1777 stabilizeRelays();
1778 stabilizeLinks();
1779 updateVisual();
1780}
1781
1782void BaseOverlay::updateVisual(){
1783
1784 //
1785 // update base overlay structure
1786 //
1787
1788 static NodeID pre = NodeID::UNSPECIFIED;
1789 static NodeID suc = NodeID::UNSPECIFIED;
1790
1791 vector<NodeID> nodes = this->getOverlayNeighbors(false);
1792
1793 if(nodes.size() == 0){
1794
1795 if(pre != NodeID::UNSPECIFIED){
1796 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
1797 pre = NodeID::UNSPECIFIED;
1798 }
1799 if(suc != NodeID::UNSPECIFIED){
1800 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
1801 suc = NodeID::UNSPECIFIED;
1802 }
1803
1804 } // if(nodes.size() == 0)
1805
1806 if(nodes.size() == 1){
1807 // only one node, make this pre and succ
1808 // and then go into the node.size()==2 case
1809 //nodes.push_back(nodes.at(0));
1810
1811 if(pre != nodes.at(0)){
1812 pre = nodes.at(0);
1813 if(pre != NodeID::UNSPECIFIED)
1814 visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
1815 }
1816 }
1817
1818 if(nodes.size() == 2){
1819
1820 // old finger
1821 if(nodes.at(0) != pre){
1822 if(pre != NodeID::UNSPECIFIED)
1823 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
1824 pre = NodeID::UNSPECIFIED;
1825 }
1826 if(nodes.at(1) != suc){
1827 if(suc != NodeID::UNSPECIFIED)
1828 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
1829 suc = NodeID::UNSPECIFIED;
1830 }
1831
1832 // connect with fingers
1833 if(pre == NodeID::UNSPECIFIED){
1834 pre = nodes.at(0);
1835 if(pre != NodeID::UNSPECIFIED)
1836 visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
1837 }
1838 if(suc == NodeID::UNSPECIFIED){
1839 suc = nodes.at(1);
1840 if(suc != NodeID::UNSPECIFIED)
1841 visualInstance.visConnect(visualIdOverlay, this->nodeId, suc, "");
1842 }
1843
1844 } //if(nodes.size() == 2)
1845
1846// {
1847// logging_error("================================");
1848// logging_error("my nodeid " << nodeId.get(MAX_KEYLENGTH-16, 16));
1849// logging_error("================================");
1850// if(nodes.size()>= 1){
1851// logging_error("real pre " << nodes.at(0).toString());
1852// logging_error("real pre " << nodes.at(0).get(MAX_KEYLENGTH-16, 16));
1853// }
1854// if(nodes.size()>= 2){
1855// logging_error("real suc " << nodes.at(1).toString());
1856// logging_error("real suc " << nodes.at(1).get(MAX_KEYLENGTH-16, 16));
1857// }
1858// logging_error("================================");
1859// if(pre == NodeID::UNSPECIFIED){
1860// logging_error("pre: unspecified");
1861// }else{
1862// unsigned int prei = pre.get(MAX_KEYLENGTH-16, 16);
1863// logging_error("pre: " << prei);
1864// }
1865// if(suc == NodeID::UNSPECIFIED){
1866// logging_error("suc: unspecified");
1867// }else{
1868// unsigned int suci = suc.get(MAX_KEYLENGTH-16, 16);
1869// logging_error("suc: " << suci);
1870// }
1871// logging_error("================================");
1872// }
1873
1874 //
1875 // update base communication links
1876 //
1877
1878 static set<NodeID> linkset;
1879 set<NodeID> remotenodes;
1880 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1881 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
1882 continue;
1883
1884 if (ld->routeRecord.size()>1 && ld->relayed) {
1885 for (size_t i=1; i<ld->routeRecord.size(); i++)
1886 remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] );
1887 } else {
1888 remotenodes.insert(ld->remoteNode);
1889 }
1890 }
1891
1892 // which links are old and need deletion?
1893 bool changed = false;
1894
1895 do{
1896 changed = false;
1897 BOOST_FOREACH(NodeID n, linkset){
1898 if(remotenodes.find(n) == remotenodes.end()){
1899 visualInstance.visDisconnect(visualIdBase, this->nodeId, n, "");
1900 linkset.erase(n);
1901 changed = true;
1902 break;
1903 }
1904 }
1905 }while(changed);
1906
1907 // which links are new and need creation?
1908 do{
1909 changed = false;
1910 BOOST_FOREACH(NodeID n, remotenodes){
1911 if(linkset.find(n) == linkset.end()){
1912 visualInstance.visConnect(visualIdBase, this->nodeId, n, "");
1913 linkset.insert(n);
1914 changed = true;
1915 break;
1916 }
1917 }
1918 }while(changed);
1919
1920}
1921
1922// ----------------------------------------------------------------------------
1923
1924std::string BaseOverlay::debugInformation() {
1925 std::stringstream s;
1926 int i=0;
1927
1928 // dump overlay information
1929 s << "Long debug info ... [see below]" << endl << endl;
1930 s << "--- overlay information ----------------------" << endl;
1931 s << overlayInterface->debugInformation() << endl;
1932
1933 // dump link state
1934 s << "--- link state -------------------------------" << endl;
1935 BOOST_FOREACH( LinkDescriptor* ld, links ) {
1936 s << "link " << i << ": " << ld << endl;
1937 i++;
1938 }
1939 s << endl << endl;
1940
1941 return s.str();
1942}
1943
1944}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.