close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/overlay/BaseOverlay.cpp@ 12459

Last change on this file since 12459 was 12438, checked in by hock@…, 11 years ago

new callbacks in ariba::NodeListener:

  • onOverlayConnected
  • onOverlayDisconnected
File size: 79.7 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/visual/OvlVis.h"
57#include "ariba/utility/visual/DddVis.h"
58#include "ariba/utility/visual/ServerVis.h"
59#include <ariba/utility/misc/sha1.h>
60
61namespace ariba {
62namespace overlay {
63
64using namespace std;
65using ariba::transport::system_priority;
66
67#define visualInstance ariba::utility::DddVis::instance()
68#define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
69#define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
70
71
72// time constants (in seconds)
73#define KEEP_ALIVE_TIME 60 // send keep-alive message after link is not used for #s
74
75#define LINK_ESTABLISH_TIME_OUT 10 // timeout: link requested but not up
76#define KEEP_ALIVE_TIME_OUT KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT // timeout: no data received on this link (incl. keep-alive messages)
77#define AUTO_LINK_TIME_OUT KEEP_ALIVE_TIME_OUT // timeout: auto link not used for #s
78
79
80// ----------------------------------------------------------------------------
81
82/* *****************************************************************************
83 * PREREQUESITES
84 * ****************************************************************************/
85
86CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
87 if( !communicationListeners.contains( service ) ) {
88 logging_info( "No listener found for service " << service.toString() );
89 return NULL;
90 }
91 CommunicationListener* listener = communicationListeners.get( service );
92 assert( listener != NULL );
93 return listener;
94}
95
96// link descriptor handling ----------------------------------------------------
97
98LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
99 foreach( LinkDescriptor* lp, links )
100 if ((communication ? lp->communicationId : lp->overlayId) == link)
101 return lp;
102 return NULL;
103}
104
105const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
106 foreach( const LinkDescriptor* lp, links )
107 if ((communication ? lp->communicationId : lp->overlayId) == link)
108 return lp;
109 return NULL;
110}
111
112/// erases a link descriptor
113void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
114 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
115 LinkDescriptor* ld = *i;
116 if ((communication ? ld->communicationId : ld->overlayId) == link) {
117 delete ld;
118 links.erase(i);
119 break;
120 }
121 }
122}
123
124/// adds a link descriptor
125LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
126 LinkDescriptor* desc = getDescriptor( link );
127 if ( desc == NULL ) {
128 desc = new LinkDescriptor();
129 if (!link.isUnspecified()) desc->overlayId = link;
130 links.push_back(desc);
131 }
132 return desc;
133}
134
135/// returns a auto-link descriptor
136LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service )
137{
138 // search for a descriptor that is already up
139 foreach( LinkDescriptor* lp, links )
140 {
141 if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) )
142 return lp;
143 }
144
145 // if not found, search for one that is about to come up...
146 foreach( LinkDescriptor* lp, links )
147 {
148 time_t now = time(NULL);
149
150 if (lp->autolink && lp->remoteNode == node && lp->service == service
151 && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT )
152 return lp;
153 }
154
155 return NULL;
156}
157
158/// stabilizes link information
159void BaseOverlay::stabilizeLinks() {
160 time_t now = time(NULL);
161
162 // send keep-alive messages over established links
163 foreach( LinkDescriptor* ld, links )
164 {
165 if (!ld->up) continue;
166
167 if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME )
168 {
169 logging_debug("[BaseOverlay] Sending KeepAlive over "
170 << ld->to_string()
171 << " after "
172 << difftime( now, ld->keepAliveSent )
173 << "s");
174
175 OverlayMsg msg( OverlayMsg::typeKeepAlive,
176 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
177 msg.setRouteRecord(true);
178 ld->keepAliveSent = now;
179 send_link( &msg, ld->overlayId, system_priority::OVERLAY );
180 }
181 }
182
183 // iterate over all links and check for time boundaries
184 vector<LinkDescriptor*> oldlinks;
185 foreach( LinkDescriptor* ld, links ) {
186
187 // link connection request stale?
188 if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT ) // NOTE: keepAliveReceived == now, on connection request
189 {
190 logging_info( "Link connection request is stale, closing: " << ld );
191 ld->failed = true;
192 oldlinks.push_back( ld );
193 continue;
194 }
195
196 if (!ld->up) continue;
197
198
199
200
201 // check if link is relayed and retry connecting directly
202 // TODO Mario: What happens here? --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply
203 if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) {
204 ld->retryCounter--;
205 ld->communicationId = bc->establishLink( ld->endpoint );
206 }
207
208 // remote used as relay flag
209 if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT) // TODO is this a reasonable timeout ??
210 ld->relaying = false;
211
212 // drop links that are dropped and not used as relay
213 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
214 oldlinks.push_back( ld );
215 continue;
216 }
217
218 // auto-link time exceeded?
219 if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) {
220 oldlinks.push_back( ld );
221 continue;
222 }
223
224 // keep alives missed? yes->
225 if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT )
226 {
227 logging_info( "Link is stale, closing: " << ld );
228 ld->failed = true;
229 oldlinks.push_back( ld );
230 continue;
231 }
232 }
233
234 // drop links
235 foreach( LinkDescriptor* ld, oldlinks ) {
236 logging_info( "Link timed out. Dropping " << ld );
237 ld->relaying = false;
238 dropLink( ld->overlayId );
239 }
240
241
242
243
244 // show link state (debug output)
245 if (counter>=10 || counter<0)
246 {
247 showLinks();
248 counter = 0;
249 }
250 else
251 {
252 counter++;
253 }
254}
255
256
257std::string BaseOverlay::getLinkHTMLInfo() {
258 std::ostringstream s;
259 vector<NodeID> nodes;
260 if (links.size()==0) {
261 s << "<h2 style=\"color=#606060\">No links established!</h2>";
262 } else {
263 s << "<h2 style=\"color=#606060\">Links</h2>";
264 s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
265 s << "<tr style=\"background-color=#ffe0e0\">";
266 s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
267 s << "</tr>";
268
269 int i=0;
270 foreach( LinkDescriptor* ld, links ) {
271 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
272 bool found = false;
273 foreach(NodeID& id, nodes)
274 if (id == ld->remoteNode) found = true;
275 if (found) continue;
276 i++;
277 nodes.push_back(ld->remoteNode);
278 if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
279 else s << "<tr>";
280 s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
281 s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
282 s << "<td>";
283 if (ld->routeRecord.size()>1 && ld->relayed) {
284 for (size_t i=1; i<ld->routeRecord.size(); i++)
285 s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
286 } else {
287 s << "Direct";
288 }
289 s << "</td>";
290 s << "</tr>";
291 }
292 s << "</table>";
293 }
294 return s.str();
295}
296
297/// shows the current link state
298void BaseOverlay::showLinks() {
299 int i=0;
300 logging_info("--- link state -------------------------------");
301 foreach( LinkDescriptor* ld, links ) {
302 string epd = "";
303 if (isLinkDirectVital(ld))
304 {
305// epd = getEndpointDescriptor(ld->remoteNode).toString();
306
307 epd = "Connection: ";
308 epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string();
309 epd += " <---> ";
310 epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string();
311 }
312
313 logging_info("LINK_STATE: " << i << ": " << ld << " " << epd);
314 i++;
315 }
316 logging_info("----------------------------------------------");
317}
318
319/// compares two arbitrary links to the same node
320int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
321 LinkDescriptor* lhsld = getDescriptor(lhs);
322 LinkDescriptor* rhsld = getDescriptor(rhs);
323 if (lhsld==NULL || rhsld==NULL
324 || !lhsld->up || !rhsld->up
325 || lhsld->remoteNode != rhsld->remoteNode) return -1;
326
327 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
328 return -1;
329
330 return 1;
331}
332
333
334// internal message delivery ---------------------------------------------------
335
336
337seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority )
338{
339 // set priority
340 message->setPriority(priority);
341
342 // wrap old OverlayMsg into reboost message
343 reboost::message_t wrapped_message = message->wrap_up_for_sending();
344
345 // send down to BaseCommunication
346 try
347 {
348 // * send *
349 return bc->sendMessage(bc_link, wrapped_message, priority, false);
350 }
351 catch ( communication::communication_message_not_sent& e )
352 {
353 ostringstream out;
354 out << "Communication message not sent: " << e.what();
355 throw message_not_sent(out.str());
356 }
357
358 throw logic_error("This should never happen!");
359}
360
361
362/// routes a message to its destination node
363void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop )
364{
365 // exceeded time-to-live? yes-> drop message
366 if (message->getNumHops() > message->getTimeToLive())
367 {
368 logging_warn("Message exceeded TTL. Dropping message and relay routes "
369 << "for recovery. Hop count: " << (int) message->getNumHops());
370 removeRelayNode(message->getDestinationNode());
371 return;
372 }
373
374 // no-> forward message
375 else
376 {
377 // destinastion myself? yes-> handle message
378 if (message->getDestinationNode() == nodeId)
379 {
380 logging_warn("Usually I should not route messages to myself. And I won't!");
381 }
382
383 // no->send message to next hop
384 else
385 {
386 try
387 {
388 /* (deep) packet inspection to determine priority */
389 // BRANCH: typeData --> send with low priority
390 if ( message->getType() == OverlayMsg::typeData )
391 {
392 // TODO think about implementing explicit routing queue (with active queue management??)
393 send( message,
394 message->getDestinationNode(),
395 message->getPriority(),
396 last_hop );
397 }
398 // BRANCH: internal message --> send with higher priority
399 else
400 {
401 send( message,
402 message->getDestinationNode(),
403 system_priority::HIGH,
404 last_hop );
405 }
406 }
407 catch ( message_not_sent& e )
408 {
409 logging_warn("Unable to route message of type "
410 << message->getType()
411 << " to "
412 << message->getDestinationNode()
413 << ". Reason: "
414 << e.what());
415
416 // inform sender
417 if ( message->getType() != OverlayMsg::typeMessageLost )
418 {
419 report_lost_message(message);
420 }
421 }
422 }
423 }
424}
425
426void BaseOverlay::report_lost_message( const OverlayMsg* message )
427{
428 OverlayMsg reply(OverlayMsg::typeMessageLost);
429 reply.setSeqNum(message->getSeqNum());
430
431 /**
432 * MessageLost-Message
433 *
434 * - Type of lost message
435 * - Hop count of lost message
436 * - Source-LinkID of lost message
437 */
438 reboost::shared_buffer_t b(sizeof(uint8_t)*2);
439 b.mutable_data()[0] = message->getType();
440 b.mutable_data()[1] = message->getNumHops();
441 reply.append_buffer(b);
442 reply.append_buffer(message->getSourceLink().serialize());
443
444 try
445 {
446 send_node(&reply, message->getSourceNode(),
447 system_priority::OVERLAY,
448 OverlayInterface::OVERLAY_SERVICE_ID);
449 }
450 catch ( message_not_sent& e )
451 {
452 logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too.");
453 }
454}
455
456/// sends a message to another node, delivers it to the base overlay class
457seqnum_t BaseOverlay::send( OverlayMsg* message,
458 const NodeID& destination,
459 uint8_t priority,
460 const NodeID& last_hop ) throw(message_not_sent)
461{
462 LinkDescriptor* next_link = NULL;
463
464 // drop messages to unspecified destinations
465 if (destination.isUnspecified())
466 throw message_not_sent("No destination specified. Drop!");
467
468 // send messages to myself -> drop!
469 // TODO maybe this is not what we want. why not just deliver this message?
470 // There is a similar check in the route function, there it should be okay.
471 if (destination == nodeId)
472 {
473 logging_warn("Sent message to myself. Drop!");
474
475 throw message_not_sent("Sent message to myself. Drop!");
476 }
477
478 // use relay path?
479 if (message->isRelayed())
480 {
481 next_link = getRelayLinkTo( destination );
482
483 if (next_link != NULL)
484 {
485 next_link->setRelaying();
486
487 // * send message over relayed link *
488 return send_overlaymessage_down(message, next_link->communicationId, priority);
489 }
490 else
491 {
492 logging_warn("No relay hop found to " << destination
493 << " -- trying to route over overlay paths ...")
494 }
495 }
496
497
498 // last resort -> route over overlay path
499 LinkID next_id = overlayInterface->getNextLinkId( destination );
500 if ( next_id.isUnspecified() )
501 {
502 // apperently we're the closest node --> try second best node
503 // NOTE: This is helpful if our routing table is not up-to-date, but
504 // may lead to circles. So we have to be careful.
505 std::vector<const LinkID*> next_ids =
506 overlayInterface->getSortedLinkIdsTowardsNode( destination );
507
508 for ( int i = 0; i < next_ids.size(); i++ )
509 {
510 const LinkID& link = *next_ids[i];
511
512 if ( ! link.isUnspecified() )
513 {
514 next_id = link;
515
516 break;
517 }
518 }
519
520 // still no next hop found. drop.
521 if ( next_id.isUnspecified() )
522 {
523 logging_warn("Could not send message. No next hop found to " <<
524 destination );
525 logging_error("ERROR: " << debugInformation() );
526
527 throw message_not_sent("No next hop found.");
528 }
529 }
530
531
532 /* get link descriptor, do some checks and send message */
533 next_link = getDescriptor(next_id);
534
535 // check pointer
536 if ( next_link == NULL )
537 {
538 // NOTE: this shuldn't happen
539 throw message_not_sent("Could not send message. Link not known.");
540 }
541
542 // avoid circles
543 if ( next_link->remoteNode == last_hop )
544 {
545 // XXX logging_debug
546 logging_info("Possible next hop would create a circle: "
547 << next_link->remoteNode);
548
549 throw message_not_sent("Could not send message. Possible next hop would create a circle.");
550 }
551
552 // check if link is up
553 if ( ! next_link->up)
554 {
555 logging_warn("Could not send message. Link not up");
556 logging_error("ERROR: " << debugInformation() );
557
558 throw message_not_sent("Could not send message. Link not up");
559 }
560
561 // * send message over overlay link *
562 return send(message, next_link, priority);
563}
564
565
566/// send a message using a link descriptor, delivers it to the base overlay class
567seqnum_t BaseOverlay::send( OverlayMsg* message,
568 LinkDescriptor* ldr,
569 uint8_t priority ) throw(message_not_sent)
570{
571 // check if null
572 if (ldr == NULL)
573 {
574 ostringstream out;
575 out << "Can not send message to " << message->getDestinationAddress();
576 throw message_not_sent(out.str());
577 }
578
579 // check if up
580 if ( !ldr->up )
581 {
582 logging_error("DEBUG_INFO: " << debugInformation() );
583
584 ostringstream out;
585 out << "Can not send message. Link not up:" << ldr->to_string();
586 throw message_not_sent(out.str());
587 }
588
589 LinkDescriptor* next_hop_ld = NULL;
590
591 // BRANCH: relayed link
592 if (ldr->relayed)
593 {
594 logging_debug("Resolving direct link for relayed link to "
595 << ldr->remoteNode);
596
597 next_hop_ld = getRelayLinkTo( ldr->remoteNode );
598
599 if (next_hop_ld==NULL)
600 {
601 logging_error("DEBUG_INFO: " << debugInformation() );
602
603 ostringstream out;
604 out << "No relay path found to link: " << ldr;
605 throw message_not_sent(out.str());
606 }
607
608 next_hop_ld->setRelaying();
609 message->setRelayed(true);
610 }
611 // BRANCH: direct link
612 else
613 {
614 next_hop_ld = ldr;
615 }
616
617
618 // check next hop-link
619 if ( ! next_hop_ld->communicationUp)
620 {
621 throw message_not_sent( "send(): Could not send message."
622 " Not a relayed link and direct link is not up." );
623 }
624
625 // send over next link
626 logging_debug("send(): Sending message over direct link.");
627 return send_overlaymessage_down(message, next_hop_ld->communicationId, priority);
628
629}
630
631seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
632 uint8_t priority, const ServiceID& service) throw(message_not_sent)
633{
634 message->setSourceNode(nodeId);
635 message->setDestinationNode(remote);
636 message->setService(service);
637 return send( message, remote, priority );
638}
639
640void BaseOverlay::send_link( OverlayMsg* message,
641 const LinkID& link,
642 uint8_t priority ) throw(message_not_sent)
643{
644 LinkDescriptor* ld = getDescriptor(link);
645 if (ld==NULL)
646 {
647 throw message_not_sent("Cannot find descriptor to link id=" + link.toString());
648 }
649
650 message->setSourceNode(nodeId);
651 message->setDestinationNode(ld->remoteNode);
652
653 message->setSourceLink(ld->overlayId);
654 message->setDestinationLink(ld->remoteLink);
655
656 message->setService(ld->service);
657 message->setRelayed(ld->relayed);
658
659
660 try
661 {
662 // * send message *
663 send( message, ld, priority );
664 }
665 catch ( message_not_sent& e )
666 {
667 // drop failed link
668 ld->failed = true;
669 dropLink(ld->overlayId);
670 }
671}
672
673// relay route management ------------------------------------------------------
674
675/// stabilize relay information
676void BaseOverlay::stabilizeRelays() {
677 vector<relay_route>::iterator i = relay_routes.begin();
678 while (i!=relay_routes.end() ) {
679 relay_route& route = *i;
680 LinkDescriptor* ld = getDescriptor(route.link);
681
682 // relay link still used and alive?
683 if (ld==NULL
684 || !isLinkDirectVital(ld)
685 || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT) // TODO this was set to 8 before.. Is the new timeout better?
686 {
687 logging_info("Forgetting relay information to node "
688 << route.node.toString() );
689 i = relay_routes.erase(i);
690 } else
691 i++;
692 }
693}
694
695void BaseOverlay::removeRelayLink( const LinkID& link ) {
696 vector<relay_route>::iterator i = relay_routes.begin();
697 while (i!=relay_routes.end() ) {
698 relay_route& route = *i;
699 if (route.link == link ) i = relay_routes.erase(i); else i++;
700 }
701}
702
703void BaseOverlay::removeRelayNode( const NodeID& remote ) {
704 vector<relay_route>::iterator i = relay_routes.begin();
705 while (i!=relay_routes.end() ) {
706 relay_route& route = *i;
707 if (route.node == remote ) i = relay_routes.erase(i); else i++;
708 }
709}
710
711/// refreshes relay information
712void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
713
714 // handle relayed messages from real links only
715 if (ld == NULL
716 || ld->relayed
717 || message->getSourceNode()==nodeId ) return;
718
719 // update usage information
720 if (message->isRelayed()) {
721 // try to find source node
722 foreach( relay_route& route, relay_routes ) {
723 // relay route found? yes->
724 if ( route.node == message->getDestinationNode() ) {
725 ld->setRelaying();
726 route.used = time(NULL);
727 }
728 }
729
730 }
731
732 // register relay path
733 if (message->isRegisterRelay()) {
734 // set relaying
735 ld->setRelaying();
736
737 // try to find source node
738 foreach( relay_route& route, relay_routes ) {
739
740 // relay route found? yes->
741 if ( route.node == message->getSourceNode() ) {
742
743 // refresh timer
744 route.used = time(NULL);
745 LinkDescriptor* rld = getDescriptor(route.link);
746
747 // route has a shorter hop count or old link is dead? yes-> replace
748 if (route.hops > message->getNumHops()
749 || rld == NULL
750 || !isLinkDirectVital(ld)) {
751 logging_info("Updating relay information to node "
752 << route.node.toString()
753 << " reducing to " << (int) message->getNumHops() << " hops.");
754 route.hops = message->getNumHops();
755 route.link = ld->overlayId;
756 }
757 return;
758 }
759 }
760
761 // not found-> add new entry
762 relay_route route;
763 route.hops = message->getNumHops();
764 route.link = ld->overlayId;
765 route.node = message->getSourceNode();
766 route.used = time(NULL);
767 logging_info("Remembering relay information to node "
768 << route.node.toString());
769 relay_routes.push_back(route);
770 }
771}
772
773/// returns a known "vital" relay link which is up and running
774LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
775 // try to find source node
776 foreach( relay_route& route, relay_routes ) {
777 if (route.node == remote ) {
778 LinkDescriptor* ld = getDescriptor( route.link );
779 if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else {
780 route.used = time(NULL);
781 return ld;
782 }
783 }
784 }
785 return NULL;
786}
787
788/* *****************************************************************************
789 * PUBLIC MEMBERS
790 * ****************************************************************************/
791
792use_logging_cpp(BaseOverlay);
793
794// ----------------------------------------------------------------------------
795
796BaseOverlay::BaseOverlay() :
797 started(false),
798 connected(false),
799 state(BaseOverlayStateInvalid),
800 bc(NULL),
801 nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED),
802 sideport(&SideportListener::DEFAULT), overlayInterface(NULL),
803 counter(0) {
804}
805
806BaseOverlay::~BaseOverlay() {
807}
808
809// ----------------------------------------------------------------------------
810
811void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) {
812 logging_info("Starting...");
813
814 // set parameters
815 bc = _basecomm;
816 nodeId = _nodeid;
817
818 // register at base communication
819 bc->registerMessageReceiver( this );
820 bc->registerEventListener( this );
821
822 // timer for auto link management
823 Timer::setInterval( 1000 ); // XXX
824// Timer::setInterval( 10000 );
825 Timer::start();
826
827 started = true;
828 state = BaseOverlayStateInvalid;
829}
830
831void BaseOverlay::stop() {
832 logging_info("Stopping...");
833
834 // stop timer
835 Timer::stop();
836
837 // delete oberlay interface
838 if(overlayInterface != NULL) {
839 delete overlayInterface;
840 overlayInterface = NULL;
841 }
842
843 // unregister at base communication
844 bc->unregisterMessageReceiver( this );
845 bc->unregisterEventListener( this );
846
847 started = false;
848 state = BaseOverlayStateInvalid;
849}
850
851bool BaseOverlay::isStarted(){
852 return started;
853}
854
855// ----------------------------------------------------------------------------
856
857void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
858 const EndpointDescriptor& bootstrapEp) {
859
860 if(id != spovnetId){
861 logging_error("attempt to join against invalid spovnet, call initiate first");
862 return;
863 }
864
865 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
866 logging_info( "Starting to join spovnet " << id.toString() <<
867 " with nodeid " << nodeId.toString());
868
869 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
870
871 //** FIRST STEP - MANDATORY */
872
873 // bootstrap against ourselfs
874 logging_info("joining spovnet locally");
875
876 overlayInterface->joinOverlay();
877 state = BaseOverlayStateCompleted;
878 foreach( NodeListener* i, nodeListeners )
879 i->onJoinCompleted( spovnetId );
880
881 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
882 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
883
884 } else {
885
886 //** SECOND STEP - OPTIONAL */
887
888 // bootstrap against another node
889 logging_info("joining spovnet remotely against " << bootstrapEp.toString());
890
891 const LinkID& lnk = bc->establishLink( bootstrapEp );
892 bootstrapLinks.push_back(lnk);
893 logging_info("join process initiated for " << id.toString() << "...");
894 }
895}
896
897
898void BaseOverlay::startBootstrapModules(vector<pair<BootstrapManager::BootstrapType,string> > modules){
899 logging_debug("starting overlay bootstrap module");
900 overlayBootstrap.start(this, spovnetId, nodeId, modules);
901 overlayBootstrap.publish(bc->getEndpointDescriptor());
902}
903
904void BaseOverlay::stopBootstrapModules(){
905 logging_debug("stopping overlay bootstrap module");
906 overlayBootstrap.stop();
907 overlayBootstrap.revoke();
908}
909
910void BaseOverlay::leaveSpoVNet() {
911
912 logging_info( "Leaving spovnet " << spovnetId );
913 bool ret = ( state != this->BaseOverlayStateInvalid );
914
915 logging_debug( "Dropping all auto-links" );
916
917 // gather all service links
918 vector<LinkID> servicelinks;
919 foreach( LinkDescriptor* ld, links )
920 {
921 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
922 servicelinks.push_back( ld->overlayId );
923 }
924
925 // drop all service links
926 foreach( LinkID lnk, servicelinks )
927 {
928 logging_debug("Dropping service link " << lnk.toString());
929 dropLink( lnk );
930 }
931
932 // let the node leave the spovnet overlay interface
933 logging_debug( "Leaving overlay" );
934 if( overlayInterface != NULL )
935 {
936 overlayInterface->leaveOverlay();
937 }
938
939 // drop still open bootstrap links
940 foreach( LinkID lnk, bootstrapLinks )
941 {
942 logging_debug("Dropping bootstrap link " << lnk.toString());
943 bc->dropLink( lnk );
944 }
945
946 // change to inalid state
947 state = BaseOverlayStateInvalid;
948 //ovl.visShutdown( ovlId, nodeId, string("") );
949
950 visualInstance.visShutdown(visualIdOverlay, nodeId, "");
951 visualInstance.visShutdown(visualIdBase, nodeId, "");
952
953 // inform all registered services of the event
954 foreach( NodeListener* i, nodeListeners )
955 {
956 if( ret ) i->onLeaveCompleted( spovnetId );
957 else i->onLeaveFailed( spovnetId );
958 }
959}
960
961void BaseOverlay::createSpoVNet(const SpoVNetID& id,
962 const OverlayParameterSet& param,
963 const SecurityParameterSet& sec,
964 const QoSParameterSet& qos) {
965
966 // set the state that we are an initiator, this way incoming messages are
967 // handled correctly
968 logging_info( "creating spovnet " + id.toString() <<
969 " with nodeid " << nodeId.toString() );
970
971 spovnetId = id;
972
973 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
974 if( overlayInterface == NULL ) {
975 logging_fatal( "overlay structure not supported" );
976 state = BaseOverlayStateInvalid;
977
978 foreach( NodeListener* i, nodeListeners )
979 i->onJoinFailed( spovnetId );
980
981 return;
982 }
983
984 visualInstance.visCreate(visualIdBase, nodeId, "", "");
985 visualInstance.visCreate(visualIdOverlay, nodeId, "", "");
986}
987
988// ----------------------------------------------------------------------------
989
990const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
991 const NodeID& remoteId, const ServiceID& service ) {
992
993 // establish link via overlay
994 if (!remoteId.isUnspecified())
995 return establishLink( remoteId, service );
996 else
997 return establishDirectLink(remoteEp, service );
998}
999
1000/// call base communication's establish link and add link mapping
1001const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
1002 const ServiceID& service ) {
1003
1004 /// find a service listener
1005 if( !communicationListeners.contains( service ) ) {
1006 logging_error( "No listener registered for service id=" << service.toString() );
1007 return LinkID::UNSPECIFIED;
1008 }
1009 CommunicationListener* listener = communicationListeners.get( service );
1010 assert( listener != NULL );
1011
1012 // create descriptor
1013 LinkDescriptor* ld = addDescriptor();
1014 ld->relayed = false;
1015 ld->listener = listener;
1016 ld->service = service;
1017 ld->communicationId = bc->establishLink( ep );
1018
1019 /// establish link and add mapping
1020 logging_info("Establishing direct link " << ld->communicationId.toString()
1021 << " using " << ep.toString());
1022
1023 return ld->communicationId;
1024}
1025
1026/// establishes a link between two arbitrary nodes
1027const LinkID BaseOverlay::establishLink( const NodeID& remote,
1028 const ServiceID& service ) {
1029
1030 // TODO What if we already have a Link to this node and this service id?
1031
1032 // do not establish a link to myself!
1033 if (remote == nodeId) return
1034 LinkID::UNSPECIFIED;
1035
1036
1037 // create a link descriptor
1038 LinkDescriptor* ld = addDescriptor();
1039 ld->relayed = true;
1040 ld->remoteNode = remote;
1041 ld->service = service;
1042 ld->listener = getListener(ld->service);
1043
1044 // initialize sequence numbers
1045 ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
1046 logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum);
1047
1048 // create link request message
1049 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
1050 msg.setSourceLink(ld->overlayId);
1051
1052 // send over relayed link
1053 msg.setRelayed(true);
1054 msg.setRegisterRelay(true);
1055// msg.setRouteRecord(true);
1056
1057 msg.setSeqNum(ld->last_sent_seqnum);
1058
1059 // debug message
1060 logging_info(
1061 "Sending link request with"
1062 << " link=" << ld->overlayId.toString()
1063 << " node=" << ld->remoteNode.toString()
1064 << " serv=" << ld->service.toString()
1065 );
1066
1067
1068 // sending message to node
1069 try
1070 {
1071 // * send *
1072 seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service );
1073 }
1074 catch ( message_not_sent& e )
1075 {
1076 logging_warn("Link request not sent: " << e.what());
1077
1078 // Message not sent. Cancel link request.
1079 SystemQueue::instance().scheduleCall(
1080 boost::bind(
1081 &BaseOverlay::__onLinkEstablishmentFailed,
1082 this,
1083 ld->overlayId)
1084 );
1085 }
1086
1087 return ld->overlayId;
1088}
1089
1090/// NOTE: "id" is an Overlay-LinkID
1091void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id)
1092{
1093 // TODO This code redundant. But also it's not easy to aggregate in one function.
1094
1095 // get descriptor for link
1096 LinkDescriptor* ld = getDescriptor(id, false);
1097 if ( ld == NULL ) return; // not found? ->ignore!
1098
1099 logging_debug( "__onLinkEstablishmentFaild: " << ld );
1100
1101 // removing relay link information
1102 removeRelayLink(ld->overlayId);
1103
1104 // inform listeners about link down
1105 ld->communicationUp = false;
1106 if (!ld->service.isUnspecified())
1107 {
1108 CommunicationListener* lst = getListener(ld->service);
1109 if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode );
1110 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1111 }
1112
1113 // delete all queued messages (auto links)
1114 if( ld->messageQueue.size() > 0 ) {
1115 logging_warn( "Dropping link " << id.toString() << " that has "
1116 << ld->messageQueue.size() << " waiting messages" );
1117 ld->flushQueue();
1118 }
1119
1120 // erase mapping
1121 eraseDescriptor(ld->overlayId);
1122}
1123
1124
1125/// drops an established link
1126void BaseOverlay::dropLink(const LinkID& link)
1127{
1128 logging_info( "Dropping link: " << link.toString() );
1129
1130 // find the link item to drop
1131 LinkDescriptor* ld = getDescriptor(link);
1132 if( ld == NULL )
1133 {
1134 logging_warn( "Can't drop link, link is unknown!");
1135 return;
1136 }
1137
1138 // delete all queued messages
1139 if( ld->messageQueue.size() > 0 )
1140 {
1141 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
1142 << ld->messageQueue.size() << " waiting messages" );
1143 ld->flushQueue();
1144 }
1145
1146
1147 // inform application and remote note (but only once)
1148 // NOTE: If we initiated the drop, this function is called twice, but on
1149 // the second call, there is noting to do.
1150 if ( ld->up && ! ld->failed )
1151 {
1152 // inform sideport and listener
1153 if(ld->listener != NULL)
1154 {
1155 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
1156 }
1157 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
1158
1159 // send link-close to remote node
1160 logging_info("Sending LinkClose message to remote node.");
1161 OverlayMsg close_msg(OverlayMsg::typeLinkClose);
1162 send_link(&close_msg, link, system_priority::OVERLAY);
1163
1164 // deactivate link
1165 ld->up = false;
1166// ld->closing = true;
1167 }
1168
1169 else if ( ld->failed )
1170 {
1171 // inform listener
1172 if( ld->listener != NULL )
1173 {
1174 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1175 }
1176
1177 ld->up = false;
1178 __removeDroppedLink(ld->overlayId);
1179 }
1180}
1181
1182/// called from typeLinkClose-handler
1183void BaseOverlay::__removeDroppedLink(const LinkID& link)
1184{
1185 // find the link item to drop
1186 LinkDescriptor* ld = getDescriptor(link);
1187 if( ld == NULL )
1188 {
1189 return;
1190 }
1191
1192 // do not drop relay links
1193 if (!ld->relaying)
1194 {
1195 // drop the link in base communication
1196 if (ld->communicationUp)
1197 {
1198 bc->dropLink( ld->communicationId );
1199 }
1200
1201 // erase descriptor
1202 eraseDescriptor( ld->overlayId );
1203 }
1204 else
1205 {
1206 ld->dropAfterRelaying = true;
1207 }
1208}
1209
1210// ----------------------------------------------------------------------------
1211
1212/// internal send message, always use this functions to send messages over links
1213const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message,
1214 const LinkID& link,
1215 uint8_t priority ) throw(message_not_sent)
1216{
1217 logging_debug( "Sending data message on link " << link.toString() );
1218
1219 // get the mapping for this link
1220 LinkDescriptor* ld = getDescriptor(link);
1221 if( ld == NULL )
1222 {
1223 throw message_not_sent("Could not send message. Link not found id=" + link.toString());
1224 }
1225
1226 // check if the link is up yet, if its an auto link queue message
1227 if( !ld->up )
1228 {
1229 ld->setAutoUsed();
1230 if( ld->autolink )
1231 {
1232 logging_info("Auto-link " << link.toString() << " not up, queue message");
1233
1234 // queue message
1235 LinkDescriptor::message_queue_entry msg;
1236 msg.message = message;
1237 msg.priority = priority;
1238
1239 ld->messageQueue.push_back( msg );
1240
1241 return SequenceNumber::DISABLED; // TODO what to return if message is queued?
1242 }
1243 else
1244 {
1245 throw message_not_sent("Link " + link.toString() + " not up, drop message");
1246 }
1247 }
1248
1249 // TODO XXX ----> coordinate with QUIC-efforts !!
1250 // TODO aktuell: sequence numbers
1251 // TODO seqnum on fast path ?
1252 ld->last_sent_seqnum.increment();
1253
1254 /* choose fast-path for direct links; normal overlay-path otherwise */
1255 // BRANCH: direct link
1256 if ( ld->communicationUp && !ld->relayed )
1257 {
1258 // * send down to BaseCommunication *
1259 try
1260 {
1261 bc->sendMessage(ld->communicationId, message, priority, true);
1262 }
1263 catch ( communication::communication_message_not_sent& e )
1264 {
1265 ostringstream out;
1266 out << "Communication message on fast-path not sent: " << e.what();
1267 throw message_not_sent(out.str());
1268 }
1269 }
1270
1271 // BRANCH: use (slow) overlay-path
1272 else
1273 {
1274 // compile overlay message (has service and node id)
1275 OverlayMsg overmsg( OverlayMsg::typeData );
1276 overmsg.set_payload_message(message);
1277
1278 // set SeqNum
1279 if ( ld->transmit_seqnums )
1280 {
1281 overmsg.setSeqNum(ld->last_sent_seqnum);
1282 }
1283 logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum());
1284
1285 // send message over relay/direct/overlay
1286 send_link( &overmsg, ld->overlayId, priority );
1287 }
1288
1289 // return seqnum
1290 return ld->last_sent_seqnum;
1291}
1292
1293
1294const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message,
1295 const NodeID& node, uint8_t priority, const ServiceID& service) {
1296
1297 // find link for node and service
1298 LinkDescriptor* ld = getAutoDescriptor( node, service );
1299
1300 // if we found no link, create an auto link
1301 if( ld == NULL ) {
1302
1303 // debug output
1304 logging_info( "No link to send message to node "
1305 << node.toString() << " found for service "
1306 << service.toString() << ". Creating auto link ..."
1307 );
1308
1309 // call base overlay to create a link
1310 LinkID link = establishLink( node, service );
1311 ld = getDescriptor( link );
1312 if( ld == NULL ) {
1313 logging_error( "Failed to establish auto-link.");
1314 throw message_not_sent("Failed to establish auto-link.");
1315 }
1316 ld->autolink = true;
1317
1318 logging_debug( "Auto-link establishment in progress to node "
1319 << node.toString() << " with link id=" << link.toString() );
1320 }
1321 assert(ld != NULL);
1322
1323 // mark the link as used, as we now send a message through it
1324 ld->setAutoUsed();
1325
1326 // send / queue message
1327 return sendMessage( message, ld->overlayId, priority );
1328}
1329
1330
1331NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message,
1332 const NodeID& address, uint8_t priority, const ServiceID& service) {
1333
1334 if ( overlayInterface->isClosestNodeTo(address) )
1335 {
1336 return NodeID::UNSPECIFIED;
1337 }
1338
1339 const NodeID& closest_node = overlayInterface->getNextNodeId(address);
1340
1341 if ( closest_node != NodeID::UNSPECIFIED )
1342 {
1343 sendMessage(message, closest_node, priority, service);
1344 }
1345
1346 return closest_node; // return seqnum ?? tuple? closest_node via (non const) reference?
1347}
1348// ----------------------------------------------------------------------------
1349
1350const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1351 const LinkID& link) const {
1352
1353 // return own end-point descriptor
1354 if( link.isUnspecified() )
1355 return bc->getEndpointDescriptor();
1356
1357 // find link descriptor. not found -> return unspecified
1358 const LinkDescriptor* ld = getDescriptor(link);
1359 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
1360
1361 // return endpoint-descriptor from base communication
1362 return bc->getEndpointDescriptor( ld->communicationId );
1363}
1364
1365const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1366 const NodeID& node) const {
1367
1368 // return own end-point descriptor
1369 if( node == nodeId || node.isUnspecified() ) {
1370 //logging_info("getEndpointDescriptor: returning self.");
1371 return bc->getEndpointDescriptor();
1372 }
1373
1374 // no joined and request remote descriptor? -> fail!
1375 if( overlayInterface == NULL ) {
1376 logging_error( "Overlay interface not set, cannot resolve end-point." );
1377 return EndpointDescriptor::UNSPECIFIED();
1378 }
1379
1380// // resolve end-point descriptor from the base-overlay routing table
1381// const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
1382// if(ep.toString() != "") return ep;
1383
1384 // see if we can find the node in our own table
1385 foreach(const LinkDescriptor* ld, links){
1386 if(ld->remoteNode != node) continue;
1387 if(!ld->communicationUp) continue;
1388 const EndpointDescriptor& ep =
1389 bc->getEndpointDescriptor(ld->communicationId);
1390 if(ep != EndpointDescriptor::UNSPECIFIED()) {
1391 //logging_info("getEndpointDescriptor: using " << ld->to_string());
1392 return ep;
1393 }
1394 }
1395
1396 logging_warn( "No EndpointDescriptor found for node " << node );
1397 logging_warn( const_cast<BaseOverlay*>(this)->debugInformation() );
1398
1399 return EndpointDescriptor::UNSPECIFIED();
1400}
1401
1402// ----------------------------------------------------------------------------
1403
1404bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1405 sideport = _sideport;
1406 _sideport->configure( this );
1407 return true;
1408}
1409
1410bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1411 sideport = &SideportListener::DEFAULT;
1412 return true;
1413}
1414
1415// ----------------------------------------------------------------------------
1416
1417bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1418 logging_debug( "binding communication listener " << listener
1419 << " on serviceid " << sid.toString() );
1420
1421 if( communicationListeners.contains( sid ) ) {
1422 logging_error( "some listener already registered for service id "
1423 << sid.toString() );
1424 return false;
1425 }
1426
1427 communicationListeners.registerItem( listener, sid );
1428 return true;
1429}
1430
1431
1432bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1433 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1434
1435 if( !communicationListeners.contains( sid ) ) {
1436 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1437 return false;
1438 }
1439
1440 if( communicationListeners.get(sid) != listener ) {
1441 logging_warn( "listener bound to service id " << sid.toString()
1442 << " is different than listener trying to unbind" );
1443 return false;
1444 }
1445
1446 communicationListeners.unregisterItem( sid );
1447 return true;
1448}
1449
1450// ----------------------------------------------------------------------------
1451
1452bool BaseOverlay::bind(NodeListener* listener) {
1453 logging_debug( "Binding node listener " << listener );
1454
1455 // already bound? yes-> warning
1456 NodeListenerVector::iterator i =
1457 find( nodeListeners.begin(), nodeListeners.end(), listener );
1458 if( i != nodeListeners.end() ) {
1459 logging_warn("Node listener " << listener << " is already bound!" );
1460 return false;
1461 }
1462
1463 // no-> add
1464 nodeListeners.push_back( listener );
1465 return true;
1466}
1467
1468bool BaseOverlay::unbind(NodeListener* listener) {
1469 logging_debug( "Unbinding node listener " << listener );
1470
1471 // already unbound? yes-> warning
1472 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1473 if( i == nodeListeners.end() ) {
1474 logging_warn( "Node listener " << listener << " is not bound!" );
1475 return false;
1476 }
1477
1478 // no-> remove
1479 nodeListeners.erase( i );
1480 return true;
1481}
1482
1483// ----------------------------------------------------------------------------
1484
1485void BaseOverlay::onLinkUp(const LinkID& id,
1486 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
1487{
1488 logging_debug( "Link up with base communication link id=" << id );
1489
1490 // get descriptor for link
1491 LinkDescriptor* ld = getDescriptor(id, true);
1492
1493 // BRANCH: handle bootstrap link we initiated
1494 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1495 logging_info(
1496 "Join has been initiated by me and the link is now up. " <<
1497 "LinkID: " << id.toString() <<
1498 "Sending out join request for SpoVNet " << spovnetId.toString()
1499 );
1500
1501 // send join request message
1502 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1503 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1504 JoinRequest joinRequest( spovnetId, nodeId );
1505 overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer());
1506
1507 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
1508
1509 return;
1510 }
1511
1512 // BRANCH: link establishment from remote, add one!
1513 if (ld == NULL) {
1514 ld = addDescriptor( id );
1515 logging_info( "onLinkUp (remote request) descriptor: " << ld );
1516
1517 // update descriptor
1518 ld->fromRemote = true;
1519 ld->communicationId = id;
1520 ld->communicationUp = true;
1521 ld->setAutoUsed();
1522 ld->setAlive();
1523
1524 // in this case, do not inform listener, since service it unknown
1525 // -> wait for update message!
1526 }
1527
1528 // BRANCH: We requested this link in the first place
1529 else
1530 {
1531 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1532
1533 // update descriptor
1534 ld->setAutoUsed();
1535 ld->setAlive();
1536 ld->communicationUp = true;
1537 ld->fromRemote = false;
1538
1539 // BRANCH: this was a relayed link before --> convert to direct link
1540 // TODO do we really have to send a message here?
1541 if (ld->relayed)
1542 {
1543 ld->up = true;
1544 ld->relayed = false;
1545 logging_info( "Converting to direct link: " << ld );
1546
1547 // send message
1548 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1549 overMsg.setSourceLink( ld->overlayId );
1550 overMsg.setDestinationLink( ld->remoteLink );
1551 send_link( &overMsg, ld->overlayId, system_priority::OVERLAY );
1552
1553 // inform listener
1554 if( ld->listener != NULL)
1555 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1556 }
1557
1558
1559 /* NOTE: Chord is opening direct-links in it's setup routine which are
1560 * neither set to "relayed" nor to "up". To activate these links a
1561 * typeLinkUpdate must be sent.
1562 *
1563 * This branch is would also be taken when we had a working link before
1564 * (ld->up == true). I'm not sure if this case does actually happen
1565 * and whether it's tested.
1566 */
1567 else
1568 {
1569 // note: necessary to validate the link on the remote side!
1570 logging_info( "Sending out update" <<
1571 " for service " << ld->service.toString() <<
1572 " with local node id " << nodeId.toString() <<
1573 " on link " << ld->overlayId.toString() );
1574
1575 // compile and send update message
1576 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1577 overlayMsg.setAutoLink( ld->autolink );
1578 overlayMsg.setSourceNode(nodeId);
1579 overlayMsg.setDestinationNode(ld->remoteNode);
1580 overlayMsg.setSourceLink(ld->overlayId);
1581 overlayMsg.setDestinationLink(ld->remoteLink);
1582 overlayMsg.setService(ld->service);
1583 overlayMsg.setRelayed(false);
1584
1585 // TODO ld->communicationId = id ??
1586
1587 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
1588 }
1589 }
1590}
1591
1592void BaseOverlay::onLinkDown(const LinkID& id,
1593 const addressing2::EndpointPtr local,
1594 const addressing2::EndpointPtr remote)
1595{
1596 // erase bootstrap links
1597 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1598 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1599
1600 // get descriptor for link
1601 LinkDescriptor* ld = getDescriptor(id, true);
1602 if ( ld == NULL ) return; // not found? ->ignore!
1603 logging_info( "onLinkDown descriptor: " << ld );
1604
1605 // removing relay link information
1606 removeRelayLink(ld->overlayId);
1607
1608 // inform listeners about link down
1609 ld->communicationUp = false;
1610 if (!ld->service.isUnspecified()) {
1611 CommunicationListener* lst = getListener(ld->service);
1612 if(lst != NULL) lst->onLinkDown( ld->overlayId, ld->remoteNode );
1613 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1614 }
1615
1616 // delete all queued messages (auto links)
1617 if( ld->messageQueue.size() > 0 ) {
1618 logging_warn( "Dropping link " << id.toString() << " that has "
1619 << ld->messageQueue.size() << " waiting messages" );
1620 ld->flushQueue();
1621 }
1622
1623 // erase mapping
1624 eraseDescriptor(ld->overlayId);
1625
1626
1627 // notify the application if this is the last link to a different node
1628 if ( connected )
1629 {
1630 bool active_links = false;
1631
1632 // look for links that are still active
1633 foreach( LinkDescriptor* ld, links )
1634 {
1635 if ( isLinkDirectVital(ld) )
1636 {
1637 active_links = true;
1638 break;
1639 }
1640 }
1641
1642 if ( ! active_links )
1643 {
1644 connected = false;
1645
1646 foreach( NodeListener* i, nodeListeners )
1647 i->onOverlayDisconnected( spovnetId );
1648 }
1649 }
1650
1651}
1652
1653
1654void BaseOverlay::onLinkFail(const LinkID& id,
1655 const addressing2::EndpointPtr local,
1656 const addressing2::EndpointPtr remote)
1657{
1658 logging_debug( "Link fail with base communication link id=" << id );
1659
1660// // erase bootstrap links
1661// vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1662// if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1663//
1664// // get descriptor for link
1665// LinkDescriptor* ld = getDescriptor(id, true);
1666// if ( ld == NULL ) return; // not found? ->ignore!
1667// logging_debug( "Link failed id=" << ld->overlayId.toString() );
1668//
1669// // inform listeners
1670// ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1671// sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1672
1673 logging_debug( " ... calling onLinkDown ..." );
1674 onLinkDown(id, local, remote);
1675}
1676
1677
1678void BaseOverlay::onLinkChanged(const LinkID& id,
1679 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal,
1680 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote)
1681{
1682 // get descriptor for link
1683 LinkDescriptor* ld = getDescriptor(id, true);
1684 if ( ld == NULL ) return; // not found? ->ignore!
1685 logging_debug( "onLinkChanged descriptor: " << ld );
1686
1687 // inform listeners
1688 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1689 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1690
1691 // autolinks: refresh timestamp
1692 ld->setAutoUsed();
1693}
1694
1695//void BaseOverlay::onLinkQoSChanged(const LinkID& id,
1696// const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
1697// const QoSParameterSet& qos)
1698//{
1699// logging_debug( "Link quality changed with base communication link id=" << id );
1700//
1701// // get descriptor for link
1702// LinkDescriptor* ld = getDescriptor(id, true);
1703// if ( ld == NULL ) return; // not found? ->ignore!
1704// logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1705//}
1706
1707bool BaseOverlay::onLinkRequest(const LinkID& id,
1708 const addressing2::EndpointPtr local,
1709 const addressing2::EndpointPtr remote)
1710{
1711 logging_debug("Accepting link request from " << remote->to_string() );
1712
1713 // TODO ask application..?
1714
1715 return true;
1716}
1717
1718
1719
1720
1721/// handles a message from base communication
1722bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message,
1723 const LinkID& link,
1724 const NodeID&,
1725 bool bypass_overlay )
1726{
1727 // get descriptor for link
1728 LinkDescriptor* ld = getDescriptor( link, true );
1729
1730
1731 /* choose fastpath for direct links; normal overlay-path otherwise */
1732 if ( bypass_overlay && ld )
1733 {
1734 // message received --> link is alive
1735 ld->keepAliveReceived = time(NULL);
1736 // hop count on this link
1737 ld->hops = 0;
1738
1739
1740 // hand over to CommunicationListener (aka Application)
1741 CommunicationListener* lst = getListener(ld->service);
1742 if ( lst != NULL )
1743 {
1744 lst->onMessage(
1745 message,
1746 ld->remoteNode,
1747 ld->overlayId,
1748 SequenceNumber::DISABLED,
1749 NULL );
1750
1751 return true;
1752 }
1753
1754 return false;
1755 }
1756 else
1757 {
1758 return handleMessage( message, ld, link );
1759 }
1760}
1761
1762// ----------------------------------------------------------------------------
1763
1764/// Handle spovnet instance join requests
1765bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink )
1766{
1767 // decapsulate message
1768 JoinRequest joinReq;
1769 joinReq.deserialize_from_shared_buffer(message);
1770
1771 logging_info( "Received join request for spovnet " <<
1772 joinReq.getSpoVNetID().toString() );
1773
1774 // check spovnet id
1775 if( joinReq.getSpoVNetID() != spovnetId ) {
1776 logging_error(
1777 "Received join request for spovnet we don't handle " <<
1778 joinReq.getSpoVNetID().toString() );
1779
1780 return false;
1781 }
1782
1783 // TODO: here you can implement mechanisms to deny joining of a node
1784 bool allow = true;
1785 logging_info( "Sending join reply for spovnet " <<
1786 spovnetId.toString() << " to node " <<
1787 source.toString() <<
1788 ". Result: " << (allow ? "allowed" : "denied") );
1789 joiningNodes.push_back( source );
1790
1791 // return overlay parameters
1792 assert( overlayInterface != NULL );
1793 logging_debug( "Using bootstrap end-point "
1794 << getEndpointDescriptor().toString() )
1795 OverlayParameterSet parameters = overlayInterface->getParameters();
1796
1797
1798 // create JoinReplay Message
1799 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1800 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1801 JoinReply replyMsg( spovnetId, parameters, allow );
1802 retmsg.append_buffer(replyMsg.serialize_into_shared_buffer());
1803
1804 // XXX This is unlovely clash between the old message system and the new one,
1805 // but a.t.m. we can't migrate everything to the new system at once..
1806 // ---> Consider the EndpointDescriptor as part of the JoinReply..
1807 retmsg.append_buffer(getEndpointDescriptor().serialize());
1808
1809 // * send *
1810 send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY);
1811
1812 return true;
1813}
1814
1815/// Handle replies to spovnet instance join requests
1816bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink )
1817{
1818 // decapsulate message
1819 logging_debug("received join reply message");
1820 JoinReply replyMsg;
1821 EndpointDescriptor endpoints;
1822 reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message);
1823 buff = endpoints.deserialize(buff);
1824
1825 // correct spovnet?
1826 if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail
1827 logging_error( "Received SpoVNet join reply for " <<
1828 replyMsg.getSpoVNetID().toString() <<
1829 " != " << spovnetId.toString() );
1830
1831 return false;
1832 }
1833
1834 // access granted? no -> fail
1835 if( !replyMsg.getJoinAllowed() ) {
1836 logging_error( "Our join request has been denied" );
1837
1838 // drop initiator link
1839 if( !bcLink.isUnspecified() ){
1840 bc->dropLink( bcLink );
1841
1842 vector<LinkID>::iterator it = std::find(
1843 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1844 if( it != bootstrapLinks.end() )
1845 bootstrapLinks.erase(it);
1846 }
1847
1848 // inform all registered services of the event
1849 foreach( NodeListener* i, nodeListeners )
1850 i->onJoinFailed( spovnetId );
1851
1852 return true;
1853 }
1854
1855 // access has been granted -> continue!
1856 logging_info("Join request has been accepted for spovnet " <<
1857 spovnetId.toString() );
1858
1859 logging_debug( "Using bootstrap end-point "
1860 << endpoints.toString() );
1861
1862 // create overlay structure from spovnet parameter set
1863 // if we have not boostrapped yet against some other node
1864 if( overlayInterface == NULL ){
1865
1866 logging_debug("first-time bootstrapping");
1867
1868 overlayInterface = OverlayFactory::create(
1869 *this, replyMsg.getParam(), nodeId, this );
1870
1871 // overlay structure supported? no-> fail!
1872 if( overlayInterface == NULL ) {
1873 logging_error( "overlay structure not supported" );
1874
1875 if( !bcLink.isUnspecified() ){
1876 bc->dropLink( bcLink );
1877
1878 vector<LinkID>::iterator it = std::find(
1879 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1880 if( it != bootstrapLinks.end() )
1881 bootstrapLinks.erase(it);
1882 }
1883
1884 // inform all registered services of the event
1885 foreach( NodeListener* i, nodeListeners )
1886 i->onJoinFailed( spovnetId );
1887
1888 return true;
1889 }
1890
1891 // everything ok-> join the overlay!
1892 state = BaseOverlayStateCompleted;
1893 overlayInterface->createOverlay();
1894
1895 overlayInterface->joinOverlay( endpoints );
1896 overlayBootstrap.recordJoin( endpoints );
1897
1898 // update ovlvis
1899 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1900
1901 // inform all registered services of the event
1902 foreach( NodeListener* i, nodeListeners )
1903 i->onJoinCompleted( spovnetId );
1904 }
1905 else
1906 {
1907 // this is not the first bootstrap, just join the additional node
1908 logging_debug("not first-time bootstrapping");
1909 overlayInterface->joinOverlay( endpoints );
1910 overlayBootstrap.recordJoin( endpoints );
1911 } // if( overlayInterface == NULL )
1912
1913 return true;
1914}
1915
1916
1917bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld )
1918{
1919 // get service
1920 const ServiceID& service = ld->service; //overlayMsg->getService();
1921
1922 logging_debug( "Received data for service " << service.toString()
1923 << " on link " << overlayMsg->getDestinationLink().toString() );
1924
1925 // delegate data message
1926 CommunicationListener* lst = getListener(service);
1927 if(lst != NULL){
1928 lst->onMessage(
1929 message,
1930// overlayMsg->getSourceNode(),
1931// overlayMsg->getDestinationLink(),
1932 ld->remoteNode,
1933 ld->overlayId,
1934 overlayMsg->getSeqNum(),
1935 overlayMsg
1936 );
1937 }
1938
1939 return true;
1940}
1941
1942bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg )
1943{
1944 /**
1945 * Deserialize MessageLost-Message
1946 *
1947 * - Type of lost message
1948 * - Hop count of lost message
1949 * - Source-LinkID of lost message
1950 */
1951 const uint8_t* buff = message(0, sizeof(uint8_t)*2).data();
1952 uint8_t type = buff[0];
1953 uint8_t hops = buff[1];
1954 LinkID linkid;
1955 linkid.deserialize(message(sizeof(uint8_t)*2));
1956
1957 logging_warn("Node " << msg->getSourceNode()
1958 << " informed us, that our message of type " << (int) type
1959 << " is lost after traveling " << (int) hops << " hops."
1960 << " (LinkID: " << linkid.toString());
1961
1962
1963 // TODO switch-case ?
1964
1965 // BRANCH: LinkRequest --> link request failed
1966 if ( type == OverlayMsg::typeLinkRequest )
1967 {
1968 __onLinkEstablishmentFailed(linkid);
1969 }
1970
1971 // BRANCH: Data --> link disrupted. Drop link.
1972 // (We could use something more advanced here. e.g. At least send a
1973 // keep-alive message and wait for a keep-alive reply.)
1974 if ( type == OverlayMsg::typeData )
1975 {
1976 LinkDescriptor* link_desc = getDescriptor(linkid);
1977
1978 if ( link_desc )
1979 {
1980 link_desc->failed = true;
1981 }
1982
1983 dropLink(linkid);
1984 }
1985
1986 // BRANCH: ping lost
1987 if ( type == OverlayMsg::typePing )
1988 {
1989 CommunicationListener* lst = getListener(msg->getService());
1990 if( lst != NULL )
1991 {
1992 lst->onPingLost(msg->getSourceNode());
1993 }
1994 }
1995
1996 return true;
1997}
1998
1999bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2000{
2001 // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node)
2002
2003 bool send_pong = false;
2004
2005 // inform application and ask permission to send a pong message
2006 CommunicationListener* lst = getListener(overlayMsg->getService());
2007 if( lst != NULL )
2008 {
2009 send_pong = lst->onPing(overlayMsg->getSourceNode());
2010 }
2011
2012 // send pong message if allowed
2013 if ( send_pong )
2014 {
2015 OverlayMsg pong_msg(OverlayMsg::typePong);
2016 pong_msg.setSeqNum(overlayMsg->getSeqNum());
2017
2018 // send message
2019 try
2020 {
2021 send_node( &pong_msg,
2022 overlayMsg->getSourceNode(),
2023 system_priority::OVERLAY,
2024 overlayMsg->getService() );
2025 }
2026 catch ( message_not_sent& e )
2027 {
2028 logging_info("Could not send Pong-Message to node: " <<
2029 overlayMsg->getSourceNode());
2030 }
2031 }
2032}
2033
2034bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2035{
2036 // inform application
2037 CommunicationListener* lst = getListener(overlayMsg->getService());
2038 if( lst != NULL )
2039 {
2040 lst->onPong(overlayMsg->getSourceNode());
2041 }
2042}
2043
2044bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2045
2046 if( ld == NULL ) {
2047 logging_warn( "received overlay update message for link for "
2048 << "which we have no mapping" );
2049 return false;
2050 }
2051 logging_info("Received type update message on link " << ld );
2052
2053 // update our link mapping information for this link
2054 bool changed =
2055 ( ld->remoteNode != overlayMsg->getSourceNode() )
2056 || ( ld->service != overlayMsg->getService() );
2057
2058 // set parameters
2059 ld->up = true;
2060 ld->remoteNode = overlayMsg->getSourceNode();
2061 ld->remoteLink = overlayMsg->getSourceLink();
2062 ld->service = overlayMsg->getService();
2063 ld->autolink = overlayMsg->isAutoLink();
2064
2065 // if our link information changed, we send out an update, too
2066 if( changed ) {
2067 overlayMsg->swapRoles();
2068 overlayMsg->setSourceNode(nodeId);
2069 overlayMsg->setSourceLink(ld->overlayId);
2070 overlayMsg->setService(ld->service);
2071 send( overlayMsg, ld, system_priority::OVERLAY );
2072 }
2073
2074 // service registered? no-> error!
2075 if( !communicationListeners.contains( ld->service ) ) {
2076 logging_warn( "Link up: event listener has not been registered" );
2077 return false;
2078 }
2079
2080 // default or no service registered?
2081 CommunicationListener* listener = communicationListeners.get( ld->service );
2082 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
2083 logging_warn("Link up: event listener is default or null!" );
2084 return true;
2085 }
2086
2087 // update descriptor
2088 ld->listener = listener;
2089 ld->setAutoUsed();
2090 ld->setAlive();
2091
2092 // ask the service whether it wants to accept this link
2093 if( !listener->onLinkRequest(ld->remoteNode) ) {
2094
2095 logging_debug("Link id=" << ld->overlayId.toString() <<
2096 " has been denied by service " << ld->service.toString() << ", dropping link");
2097
2098 // prevent onLinkDown calls to the service
2099 ld->listener = &CommunicationListener::DEFAULT;
2100
2101 // drop the link
2102 dropLink( ld->overlayId );
2103 return true;
2104 }
2105
2106 // set link up
2107 ld->up = true;
2108 logging_info( "Link has been accepted by service and is up: " << ld );
2109
2110 // auto links: link has been accepted -> send queued messages
2111 if( ld->messageQueue.size() > 0 ) {
2112 logging_info( "Sending out queued messages on link " << ld );
2113 foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue )
2114 {
2115 sendMessage( msg.message, ld->overlayId, msg.priority );
2116 }
2117 ld->messageQueue.clear();
2118 }
2119
2120 // call the notification functions
2121 listener->onLinkUp( ld->overlayId, ld->remoteNode );
2122 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
2123
2124
2125 // notify the application if this is the first link to a different node
2126 if ( not connected )
2127 {
2128 connected = true;
2129
2130 foreach( NodeListener* i, nodeListeners )
2131 i->onOverlayConnected( spovnetId );
2132 }
2133
2134 return true;
2135}
2136
2137/// handle a link request and reply
2138bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2139
2140 //TODO: Check if a request has already been sent using getSourceLink() ...
2141
2142 // create link descriptor
2143 LinkDescriptor* ldn = addDescriptor();
2144
2145 // flags
2146 ldn->up = true;
2147 ldn->fromRemote = true;
2148 ldn->relayed = true;
2149
2150 // parameters
2151 ldn->service = overlayMsg->getService();
2152 ldn->listener = getListener(ldn->service);
2153 ldn->remoteNode = overlayMsg->getSourceNode();
2154 ldn->remoteLink = overlayMsg->getSourceLink();
2155 ldn->hops = overlayMsg->getNumHops();
2156
2157 // initialize sequence numbers
2158 ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
2159 logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum);
2160
2161
2162 // update time-stamps
2163 ldn->setAlive();
2164 ldn->setAutoUsed();
2165
2166 logging_info( "Link request received from node id="
2167 << overlayMsg->getSourceNode()
2168 << " LINK: "
2169 << ldn);
2170
2171 // create reply message and send back!
2172 overlayMsg->swapRoles(); // swap source/destination
2173 overlayMsg->setType(OverlayMsg::typeLinkReply);
2174 overlayMsg->setSourceLink(ldn->overlayId);
2175 overlayMsg->setRelayed(true);
2176// overlayMsg->setRouteRecord(true);
2177 overlayMsg->setSeqNum(ld->last_sent_seqnum);
2178
2179 // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!!
2180 // append our endpoints (for creation of a direct link)
2181 overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize());
2182
2183 send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link
2184
2185 // inform listener
2186 if(ldn != NULL && ldn->listener != NULL)
2187 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
2188
2189 return true;
2190}
2191
2192bool BaseOverlay::handleLinkReply(
2193 OverlayMsg* overlayMsg,
2194 reboost::shared_buffer_t sub_message,
2195 LinkDescriptor* ld )
2196{
2197 // deserialize EndpointDescriptor
2198 EndpointDescriptor endpoints;
2199 endpoints.deserialize(sub_message);
2200
2201 // find link request
2202 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
2203
2204 // not found? yes-> drop with error!
2205 if (ldn == NULL) {
2206 logging_error( "No link request pending for "
2207 << overlayMsg->getDestinationLink().toString() );
2208 return false;
2209 }
2210 logging_debug("Handling link reply for " << ldn )
2211
2212 // check if already up
2213 if (ldn->up) {
2214 logging_warn( "Link already up: " << ldn );
2215 return true;
2216 }
2217
2218 // debug message
2219 logging_info( "Link request reply received. Establishing link"
2220 << " for service " << overlayMsg->getService().toString()
2221 << " with local id=" << overlayMsg->getDestinationLink()
2222 << " and remote link id=" << overlayMsg->getSourceLink()
2223 << " to " << endpoints.toString()
2224 << " hop count: " << overlayMsg->getRouteRecord().size()
2225 );
2226
2227 // set local link descriptor data
2228 ldn->up = true;
2229 ldn->relayed = true;
2230 ldn->service = overlayMsg->getService();
2231 ldn->listener = getListener(ldn->service);
2232 ldn->remoteLink = overlayMsg->getSourceLink();
2233 ldn->remoteNode = overlayMsg->getSourceNode();
2234
2235 // update timestamps
2236 ldn->setAlive();
2237 ldn->setAutoUsed();
2238
2239 // auto links: link has been accepted -> send queued messages
2240 if( ldn->messageQueue.size() > 0 ) {
2241 logging_info( "Sending out queued messages on link " <<
2242 ldn->overlayId.toString() );
2243 foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue )
2244 {
2245 sendMessage( msg.message, ldn->overlayId, msg.priority );
2246 }
2247 ldn->messageQueue.clear();
2248 }
2249
2250 // inform listeners about new link
2251 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
2252
2253 // try to replace relay link with direct link
2254 ldn->retryCounter = 3;
2255 ldn->endpoint = endpoints;
2256 ldn->communicationId = bc->establishLink( ldn->endpoint );
2257
2258 return true;
2259}
2260
2261/// handle a keep-alive message for a link
2262bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2263{
2264 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
2265
2266 if ( rld != NULL )
2267 {
2268 logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() );
2269 if (overlayMsg->isRouteRecord())
2270 {
2271 rld->routeRecord = overlayMsg->getRouteRecord();
2272 }
2273
2274 // set alive
2275 rld->setAlive();
2276
2277
2278 /* answer keep alive */
2279 if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive )
2280 {
2281 time_t now = time(NULL);
2282 logging_debug("[BaseOverlay] Answering KeepAlive over "
2283 << ld->to_string()
2284 << " after "
2285 << difftime( now, ld->keepAliveSent )
2286 << "s");
2287
2288 OverlayMsg msg( OverlayMsg::typeKeepAliveReply,
2289 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
2290 msg.setRouteRecord(true);
2291 ld->keepAliveSent = now;
2292 send_link( &msg, ld->overlayId, system_priority::OVERLAY );
2293 }
2294
2295 return true;
2296 }
2297 else
2298 {
2299 logging_error("No Keep-Alive for "
2300 << overlayMsg->getDestinationLink() << ": link unknown." );
2301 return false;
2302 }
2303}
2304
2305/// handle a direct link message
2306bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2307 logging_debug( "Received direct link replacement request" );
2308
2309 /// get destination overlay link
2310 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
2311 if (rld == NULL || ld == NULL) {
2312 logging_error("Direct link replacement: Link "
2313 << overlayMsg->getDestinationLink() << "not found error." );
2314 return false;
2315 }
2316 logging_info( "Received direct link convert notification for " << rld );
2317
2318 // update information
2319 rld->communicationId = ld->communicationId;
2320 rld->communicationUp = true;
2321 rld->relayed = false;
2322
2323 // mark used and alive!
2324 rld->setAlive();
2325 rld->setAutoUsed();
2326
2327 // erase the original descriptor
2328 eraseDescriptor(ld->overlayId);
2329
2330 // inform listener
2331 if( rld->listener != NULL)
2332 rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode );
2333
2334 return true;
2335}
2336
2337/// handles an incoming message
2338bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld,
2339 const LinkID bcLink )
2340{
2341 // decapsulate overlay message
2342 OverlayMsg* overlayMsg = new OverlayMsg();
2343 reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message);
2344
2345// // XXX debug
2346// logging_info( "Received overlay message."
2347// << " Hops: " << (int) overlayMsg->getNumHops()
2348// << " Type: " << (int) overlayMsg->getType()
2349// << " Payload size: " << sub_buff.size()
2350// << " SeqNum: " << overlayMsg->getSeqNum() );
2351
2352
2353 // increase number of hops
2354 overlayMsg->increaseNumHops();
2355
2356 // refresh relay information
2357 refreshRelayInformation( overlayMsg, ld );
2358
2359 // update route record
2360 overlayMsg->addRouteRecord(nodeId);
2361
2362 // handle signaling messages (do not route!)
2363 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
2364 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd )
2365 {
2366 overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
2367 delete overlayMsg;
2368 return true;
2369 }
2370
2371 // message for reached destination? no-> route message
2372 if (!overlayMsg->getDestinationNode().isUnspecified() &&
2373 overlayMsg->getDestinationNode() != nodeId ) {
2374 logging_debug("Routing message "
2375 << " from " << overlayMsg->getSourceNode()
2376 << " to " << overlayMsg->getDestinationNode()
2377 );
2378
2379// // XXX testing AKTUELL
2380// logging_info("MARIO: Routing message "
2381// << " from " << overlayMsg->getSourceNode()
2382// << " to " << overlayMsg->getDestinationNode() );
2383// logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size());
2384 overlayMsg->append_buffer(sub_buff);
2385
2386 route( overlayMsg, ld->remoteNode );
2387 delete overlayMsg;
2388 return true;
2389 }
2390
2391
2392 /* handle base overlay message */
2393 bool ret = false; // return value
2394 try
2395 {
2396 switch ( overlayMsg->getType() )
2397 {
2398 // data transport messages
2399 case OverlayMsg::typeData:
2400 {
2401 // NOTE: On relayed links, »ld« does not point to our link, but on the relay link.
2402 LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink());
2403
2404 if ( ! end_to_end_ld )
2405 {
2406 logging_warn("Error: Data-Message claims to belong to a link we don't know.");
2407
2408 ret = false;
2409 }
2410 else
2411 {
2412 // message received --> link is alive
2413 end_to_end_ld->keepAliveReceived = time(NULL);
2414 // hop count on this link
2415 end_to_end_ld->hops = overlayMsg->getNumHops();
2416
2417 // * call handler *
2418 ret = handleData(sub_buff, overlayMsg, end_to_end_ld);
2419 }
2420
2421 break;
2422 }
2423 case OverlayMsg::typeMessageLost:
2424 ret = handleLostMessage(sub_buff, overlayMsg);
2425
2426 break;
2427
2428 // overlay setup messages
2429 case OverlayMsg::typeJoinRequest:
2430 ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink ); break;
2431 case OverlayMsg::typeJoinReply:
2432 ret = handleJoinReply(sub_buff, bcLink ); break;
2433
2434 // link specific messages
2435 case OverlayMsg::typeLinkRequest:
2436 ret = handleLinkRequest(overlayMsg, ld ); break;
2437 case OverlayMsg::typeLinkReply:
2438 ret = handleLinkReply(overlayMsg, sub_buff, ld ); break;
2439 case OverlayMsg::typeLinkUpdate:
2440 ret = handleLinkUpdate(overlayMsg, ld ); break;
2441 case OverlayMsg::typeKeepAlive:
2442 case OverlayMsg::typeKeepAliveReply:
2443 ret = handleLinkAlive(overlayMsg, ld ); break;
2444 case OverlayMsg::typeLinkDirect:
2445 ret = handleLinkDirect(overlayMsg, ld ); break;
2446
2447 case OverlayMsg::typeLinkClose:
2448 {
2449 dropLink(overlayMsg->getDestinationLink());
2450 __removeDroppedLink(overlayMsg->getDestinationLink());
2451
2452 break;
2453 }
2454
2455 /// ping over overlay path (or similar)
2456 case OverlayMsg::typePing:
2457 {
2458 ret = handlePing(overlayMsg, ld);
2459 break;
2460 }
2461 case OverlayMsg::typePong:
2462 {
2463 ret = handlePong(overlayMsg, ld);
2464 break;
2465 }
2466
2467 // handle unknown message type
2468 default:
2469 {
2470 logging_error( "received message in invalid state! don't know " <<
2471 "what to do with this message of type " << overlayMsg->getType() );
2472 ret = false;
2473 break;
2474 }
2475 }
2476 }
2477 catch ( reboost::illegal_sub_buffer& e )
2478 {
2479 logging_error( "Failed to create sub-buffer while reading message: »"
2480 << e.what()
2481 << "« Message too short? ");
2482
2483 assert(false); // XXX
2484 }
2485
2486 // free overlay message and return value
2487 delete overlayMsg;
2488 return ret;
2489}
2490
2491// ----------------------------------------------------------------------------
2492
2493void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) {
2494
2495 logging_debug( "broadcasting message to all known nodes " <<
2496 "in the overlay from service " + service.toString() );
2497
2498 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
2499 for(size_t i=0; i<nodes.size(); i++){
2500 NodeID& id = nodes.at(i);
2501 if(id == this->nodeId) continue; // don't send to ourselfs
2502
2503 sendMessage( message, id, priority, service );
2504 }
2505}
2506
2507/// return the overlay neighbors
2508vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
2509 // the known nodes _can_ also include our node, so we remove ourself
2510 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
2511 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
2512 if( i != nodes.end() ) nodes.erase( i );
2513 return nodes;
2514}
2515
2516const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
2517 if( lid == LinkID::UNSPECIFIED ) return nodeId;
2518 const LinkDescriptor* ld = getDescriptor(lid);
2519 if( ld == NULL ) return NodeID::UNSPECIFIED;
2520 else return ld->remoteNode;
2521}
2522
2523vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
2524 vector<LinkID> linkvector;
2525 foreach( LinkDescriptor* ld, links ) {
2526 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
2527 linkvector.push_back( ld->overlayId );
2528 }
2529 }
2530 return linkvector;
2531}
2532
2533
2534void BaseOverlay::onNodeJoin(const NodeID& node) {
2535 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
2536 if( i == joiningNodes.end() ) return;
2537
2538 logging_info( "node has successfully joined baseoverlay and overlay structure "
2539 << node.toString() );
2540
2541 joiningNodes.erase( i );
2542}
2543
2544void BaseOverlay::eventFunction() {
2545 stabilizeRelays();
2546 stabilizeLinks();
2547 updateVisual();
2548}
2549
2550
2551
2552/* link status */
2553bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const
2554{
2555 const LinkDescriptor* ld = getDescriptor(lnk);
2556
2557 if (!ld)
2558 return false;
2559
2560 return ld->communicationUp && !ld->relayed;
2561}
2562
2563int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const
2564{
2565 const LinkDescriptor* ld = getDescriptor(lnk);
2566
2567 if (!ld)
2568 return -1;
2569
2570 return ld->hops;
2571}
2572
2573
2574bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const
2575{
2576 time_t now = time(NULL);
2577
2578 return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..?
2579}
2580
2581bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const
2582{
2583 return isLinkVital(link) && link->communicationUp && !link->relayed;
2584}
2585
2586/* [link status] */
2587
2588
2589void BaseOverlay::updateVisual(){
2590
2591 //
2592 // update base overlay structure
2593 //
2594
2595 static NodeID pre = NodeID::UNSPECIFIED;
2596 static NodeID suc = NodeID::UNSPECIFIED;
2597
2598 vector<NodeID> nodes = this->getOverlayNeighbors(false);
2599
2600 if(nodes.size() == 0){
2601
2602 if(pre != NodeID::UNSPECIFIED){
2603 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
2604 pre = NodeID::UNSPECIFIED;
2605 }
2606 if(suc != NodeID::UNSPECIFIED){
2607 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
2608 suc = NodeID::UNSPECIFIED;
2609 }
2610
2611 } // if(nodes.size() == 0)
2612
2613 if(nodes.size() == 1){
2614 // only one node, make this pre and succ
2615 // and then go into the node.size()==2 case
2616 //nodes.push_back(nodes.at(0));
2617
2618 if(pre != nodes.at(0)){
2619 pre = nodes.at(0);
2620 if(pre != NodeID::UNSPECIFIED)
2621 visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
2622 }
2623 }
2624
2625 if(nodes.size() == 2){
2626
2627 // old finger
2628 if(nodes.at(0) != pre){
2629 if(pre != NodeID::UNSPECIFIED)
2630 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
2631 pre = NodeID::UNSPECIFIED;
2632 }
2633 if(nodes.at(1) != suc){
2634 if(suc != NodeID::UNSPECIFIED)
2635 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
2636 suc = NodeID::UNSPECIFIED;
2637 }
2638
2639 // connect with fingers
2640 if(pre == NodeID::UNSPECIFIED){
2641 pre = nodes.at(0);
2642 if(pre != NodeID::UNSPECIFIED)
2643 visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
2644 }
2645 if(suc == NodeID::UNSPECIFIED){
2646 suc = nodes.at(1);
2647 if(suc != NodeID::UNSPECIFIED)
2648 visualInstance.visConnect(visualIdOverlay, this->nodeId, suc, "");
2649 }
2650
2651 } //if(nodes.size() == 2)
2652
2653// {
2654// logging_error("================================");
2655// logging_error("my nodeid " << nodeId.get(MAX_KEYLENGTH-16, 16));
2656// logging_error("================================");
2657// if(nodes.size()>= 1){
2658// logging_error("real pre " << nodes.at(0).toString());
2659// logging_error("real pre " << nodes.at(0).get(MAX_KEYLENGTH-16, 16));
2660// }
2661// if(nodes.size()>= 2){
2662// logging_error("real suc " << nodes.at(1).toString());
2663// logging_error("real suc " << nodes.at(1).get(MAX_KEYLENGTH-16, 16));
2664// }
2665// logging_error("================================");
2666// if(pre == NodeID::UNSPECIFIED){
2667// logging_error("pre: unspecified");
2668// }else{
2669// unsigned int prei = pre.get(MAX_KEYLENGTH-16, 16);
2670// logging_error("pre: " << prei);
2671// }
2672// if(suc == NodeID::UNSPECIFIED){
2673// logging_error("suc: unspecified");
2674// }else{
2675// unsigned int suci = suc.get(MAX_KEYLENGTH-16, 16);
2676// logging_error("suc: " << suci);
2677// }
2678// logging_error("================================");
2679// }
2680
2681 //
2682 // update base communication links
2683 //
2684
2685 static set<NodeID> linkset;
2686 set<NodeID> remotenodes;
2687 foreach( LinkDescriptor* ld, links ) {
2688 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
2689 continue;
2690
2691 if (ld->routeRecord.size()>1 && ld->relayed) {
2692 for (size_t i=1; i<ld->routeRecord.size(); i++)
2693 remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] );
2694 } else {
2695 remotenodes.insert(ld->remoteNode);
2696 }
2697 }
2698
2699 // which links are old and need deletion?
2700 bool changed = false;
2701
2702 do{
2703 changed = false;
2704 foreach(NodeID n, linkset){
2705 if(remotenodes.find(n) == remotenodes.end()){
2706 visualInstance.visDisconnect(visualIdBase, this->nodeId, n, "");
2707 linkset.erase(n);
2708 changed = true;
2709 break;
2710 }
2711 }
2712 }while(changed);
2713
2714 // which links are new and need creation?
2715 do{
2716 changed = false;
2717 foreach(NodeID n, remotenodes){
2718 if(linkset.find(n) == linkset.end()){
2719 visualInstance.visConnect(visualIdBase, this->nodeId, n, "");
2720 linkset.insert(n);
2721 changed = true;
2722 break;
2723 }
2724 }
2725 }while(changed);
2726
2727}
2728
2729// ----------------------------------------------------------------------------
2730
2731std::string BaseOverlay::debugInformation() {
2732 std::stringstream s;
2733 int i=0;
2734
2735 // dump overlay information
2736 s << "Long debug info ... [see below]" << endl << endl;
2737 s << "--- overlay information ----------------------" << endl;
2738 s << overlayInterface->debugInformation() << endl;
2739
2740 // dump link state
2741 s << "--- link state -------------------------------" << endl;
2742 foreach( LinkDescriptor* ld, links ) {
2743 s << "link " << i << ": " << ld << endl;
2744 i++;
2745 }
2746 s << endl << endl;
2747
2748 return s.str();
2749}
2750
2751}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.