source: source/ariba/overlay/BaseOverlay.cpp@ 12063

Last change on this file since 12063 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 78.8 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include <sstream>
42#include <iostream>
43#include <string>
44#include <boost/foreach.hpp>
45
46#include "ariba/NodeListener.h"
47#include "ariba/CommunicationListener.h"
48#include "ariba/SideportListener.h"
49
50#include "ariba/overlay/LinkDescriptor.h"
51
52#include "ariba/overlay/messages/OverlayMsg.h"
53#include "ariba/overlay/messages/JoinRequest.h"
54#include "ariba/overlay/messages/JoinReply.h"
55
56#include "ariba/utility/visual/OvlVis.h"
57#include "ariba/utility/visual/DddVis.h"
58#include "ariba/utility/visual/ServerVis.h"
59#include <ariba/utility/misc/sha1.h>
60
61namespace ariba {
62namespace overlay {
63
64using namespace std;
65using ariba::transport::system_priority;
66
67#define visualInstance ariba::utility::DddVis::instance()
68#define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY
69#define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION
70
71
72// time constants (in seconds)
73#define KEEP_ALIVE_TIME 60 // send keep-alive message after link is not used for #s
74
75#define LINK_ESTABLISH_TIME_OUT 10 // timeout: link requested but not up
76#define KEEP_ALIVE_TIME_OUT KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT // timeout: no data received on this link (incl. keep-alive messages)
77#define AUTO_LINK_TIME_OUT KEEP_ALIVE_TIME_OUT // timeout: auto link not used for #s
78
79
80// ----------------------------------------------------------------------------
81
82/* *****************************************************************************
83 * PREREQUESITES
84 * ****************************************************************************/
85
86CommunicationListener* BaseOverlay::getListener( const ServiceID& service ) {
87 if( !communicationListeners.contains( service ) ) {
88 logging_info( "No listener found for service " << service.toString() );
89 return NULL;
90 }
91 CommunicationListener* listener = communicationListeners.get( service );
92 assert( listener != NULL );
93 return listener;
94}
95
96// link descriptor handling ----------------------------------------------------
97
98LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) {
99 foreach( LinkDescriptor* lp, links )
100 if ((communication ? lp->communicationId : lp->overlayId) == link)
101 return lp;
102 return NULL;
103}
104
105const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const {
106 foreach( const LinkDescriptor* lp, links )
107 if ((communication ? lp->communicationId : lp->overlayId) == link)
108 return lp;
109 return NULL;
110}
111
112/// erases a link descriptor
113void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) {
114 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) {
115 LinkDescriptor* ld = *i;
116 if ((communication ? ld->communicationId : ld->overlayId) == link) {
117 delete ld;
118 links.erase(i);
119 break;
120 }
121 }
122}
123
124/// adds a link descriptor
125LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) {
126 LinkDescriptor* desc = getDescriptor( link );
127 if ( desc == NULL ) {
128 desc = new LinkDescriptor();
129 if (!link.isUnspecified()) desc->overlayId = link;
130 links.push_back(desc);
131 }
132 return desc;
133}
134
135/// returns a auto-link descriptor
136LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service )
137{
138 // search for a descriptor that is already up
139 foreach( LinkDescriptor* lp, links )
140 {
141 if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) )
142 return lp;
143 }
144
145 // if not found, search for one that is about to come up...
146 foreach( LinkDescriptor* lp, links )
147 {
148 time_t now = time(NULL);
149
150 if (lp->autolink && lp->remoteNode == node && lp->service == service
151 && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT )
152 return lp;
153 }
154
155 return NULL;
156}
157
158/// stabilizes link information
159void BaseOverlay::stabilizeLinks() {
160 time_t now = time(NULL);
161
162 // send keep-alive messages over established links
163 foreach( LinkDescriptor* ld, links )
164 {
165 if (!ld->up) continue;
166
167 if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME )
168 {
169 logging_debug("[BaseOverlay] Sending KeepAlive over "
170 << ld->to_string()
171 << " after "
172 << difftime( now, ld->keepAliveSent )
173 << "s");
174
175 OverlayMsg msg( OverlayMsg::typeKeepAlive,
176 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
177 msg.setRouteRecord(true);
178 ld->keepAliveSent = now;
179 send_link( &msg, ld->overlayId, system_priority::OVERLAY );
180 }
181 }
182
183 // iterate over all links and check for time boundaries
184 vector<LinkDescriptor*> oldlinks;
185 foreach( LinkDescriptor* ld, links ) {
186
187 // link connection request stale?
188 if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT ) // NOTE: keepAliveReceived == now, on connection request
189 {
190 logging_info( "Link connection request is stale, closing: " << ld );
191 ld->failed = true;
192 oldlinks.push_back( ld );
193 continue;
194 }
195
196 if (!ld->up) continue;
197
198
199
200
201 // check if link is relayed and retry connecting directly
202 // TODO Mario: What happens here? --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply
203 if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) {
204 ld->retryCounter--;
205 ld->communicationId = bc->establishLink( ld->endpoint );
206 }
207
208 // remote used as relay flag
209 if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT) // TODO is this a reasonable timeout ??
210 ld->relaying = false;
211
212 // drop links that are dropped and not used as relay
213 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) {
214 oldlinks.push_back( ld );
215 continue;
216 }
217
218 // auto-link time exceeded?
219 if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) {
220 oldlinks.push_back( ld );
221 continue;
222 }
223
224 // keep alives missed? yes->
225 if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT )
226 {
227 logging_info( "Link is stale, closing: " << ld );
228 ld->failed = true;
229 oldlinks.push_back( ld );
230 continue;
231 }
232 }
233
234 // drop links
235 foreach( LinkDescriptor* ld, oldlinks ) {
236 logging_info( "Link timed out. Dropping " << ld );
237 ld->relaying = false;
238 dropLink( ld->overlayId );
239 }
240
241
242
243
244 // show link state (debug output)
245 if (counter>=10 || counter<0)
246 {
247 showLinks();
248 counter = 0;
249 }
250 else
251 {
252 counter++;
253 }
254}
255
256
257std::string BaseOverlay::getLinkHTMLInfo() {
258 std::ostringstream s;
259 vector<NodeID> nodes;
260 if (links.size()==0) {
261 s << "<h2 style=\"color=#606060\">No links established!</h2>";
262 } else {
263 s << "<h2 style=\"color=#606060\">Links</h2>";
264 s << "<table width=\"100%\" cellpadding=\"0\" border=\"0\" cellspacing=\"0\">";
265 s << "<tr style=\"background-color=#ffe0e0\">";
266 s << "<td><b>Link ID</b></td><td><b>Remote ID</b></td><td><b>Relay path</b></td>";
267 s << "</tr>";
268
269 int i=0;
270 foreach( LinkDescriptor* ld, links ) {
271 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;
272 bool found = false;
273 foreach(NodeID& id, nodes)
274 if (id == ld->remoteNode) found = true;
275 if (found) continue;
276 i++;
277 nodes.push_back(ld->remoteNode);
278 if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">";
279 else s << "<tr>";
280 s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>";
281 s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>";
282 s << "<td>";
283 if (ld->routeRecord.size()>1 && ld->relayed) {
284 for (size_t i=1; i<ld->routeRecord.size(); i++)
285 s << ld->routeRecord[ld->routeRecord.size()-i-1].toString().substr(0,4) << ".. ";
286 } else {
287 s << "Direct";
288 }
289 s << "</td>";
290 s << "</tr>";
291 }
292 s << "</table>";
293 }
294 return s.str();
295}
296
297/// shows the current link state
298void BaseOverlay::showLinks() {
299 int i=0;
300 logging_info("--- link state -------------------------------");
301 foreach( LinkDescriptor* ld, links ) {
302 string epd = "";
303 if (isLinkDirectVital(ld))
304 {
305// epd = getEndpointDescriptor(ld->remoteNode).toString();
306
307 epd = "Connection: ";
308 epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string();
309 epd += " <---> ";
310 epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string();
311 }
312
313 logging_info("LINK_STATE: " << i << ": " << ld << " " << epd);
314 i++;
315 }
316 logging_info("----------------------------------------------");
317}
318
319/// compares two arbitrary links to the same node
320int BaseOverlay::compare( const LinkID& lhs, const LinkID& rhs ) {
321 LinkDescriptor* lhsld = getDescriptor(lhs);
322 LinkDescriptor* rhsld = getDescriptor(rhs);
323 if (lhsld==NULL || rhsld==NULL
324 || !lhsld->up || !rhsld->up
325 || lhsld->remoteNode != rhsld->remoteNode) return -1;
326
327 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) )
328 return -1;
329
330 return 1;
331}
332
333
334// internal message delivery ---------------------------------------------------
335
336
337seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority )
338{
339 // set priority
340 message->setPriority(priority);
341
342 // wrap old OverlayMsg into reboost message
343 reboost::message_t wrapped_message = message->wrap_up_for_sending();
344
345 // send down to BaseCommunication
346 try
347 {
348 // * send *
349 return bc->sendMessage(bc_link, wrapped_message, priority, false);
350 }
351 catch ( communication::communication_message_not_sent& e )
352 {
353 ostringstream out;
354 out << "Communication message not sent: " << e.what();
355 throw message_not_sent(out.str());
356 }
357
358 throw logic_error("This should never happen!");
359}
360
361
362/// routes a message to its destination node
363void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop )
364{
365 // exceeded time-to-live? yes-> drop message
366 if (message->getNumHops() > message->getTimeToLive())
367 {
368 logging_warn("Message exceeded TTL. Dropping message and relay routes "
369 << "for recovery. Hop count: " << (int) message->getNumHops());
370 removeRelayNode(message->getDestinationNode());
371 return;
372 }
373
374 // no-> forward message
375 else
376 {
377 // destinastion myself? yes-> handle message
378 if (message->getDestinationNode() == nodeId)
379 {
380 logging_warn("Usually I should not route messages to myself. And I won't!");
381 }
382
383 // no->send message to next hop
384 else
385 {
386 try
387 {
388 /* (deep) packet inspection to determine priority */
389 // BRANCH: typeData --> send with low priority
390 if ( message->getType() == OverlayMsg::typeData )
391 {
392 // TODO think about implementing explicit routing queue (with active queue management??)
393 send( message,
394 message->getDestinationNode(),
395 message->getPriority(),
396 last_hop );
397 }
398 // BRANCH: internal message --> send with higher priority
399 else
400 {
401 send( message,
402 message->getDestinationNode(),
403 system_priority::HIGH,
404 last_hop );
405 }
406 }
407 catch ( message_not_sent& e )
408 {
409 logging_warn("Unable to route message of type "
410 << message->getType()
411 << " to "
412 << message->getDestinationNode()
413 << ". Reason: "
414 << e.what());
415
416 // inform sender
417 if ( message->getType() != OverlayMsg::typeMessageLost )
418 {
419 report_lost_message(message);
420 }
421 }
422 }
423 }
424}
425
426void BaseOverlay::report_lost_message( const OverlayMsg* message )
427{
428 OverlayMsg reply(OverlayMsg::typeMessageLost);
429 reply.setSeqNum(message->getSeqNum());
430
431 /**
432 * MessageLost-Message
433 *
434 * - Type of lost message
435 * - Hop count of lost message
436 * - Source-LinkID of lost message
437 */
438 reboost::shared_buffer_t b(sizeof(uint8_t)*2);
439 b.mutable_data()[0] = message->getType();
440 b.mutable_data()[1] = message->getNumHops();
441 reply.append_buffer(b);
442 reply.append_buffer(message->getSourceLink().serialize());
443
444 try
445 {
446 send_node(&reply, message->getSourceNode(),
447 system_priority::OVERLAY,
448 OverlayInterface::OVERLAY_SERVICE_ID);
449 }
450 catch ( message_not_sent& e )
451 {
452 logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too.");
453 }
454}
455
456/// sends a message to another node, delivers it to the base overlay class
457seqnum_t BaseOverlay::send( OverlayMsg* message,
458 const NodeID& destination,
459 uint8_t priority,
460 const NodeID& last_hop ) throw(message_not_sent)
461{
462 LinkDescriptor* next_link = NULL;
463
464 // drop messages to unspecified destinations
465 if (destination.isUnspecified())
466 throw message_not_sent("No destination specified. Drop!");
467
468 // send messages to myself -> drop!
469 // TODO maybe this is not what we want. why not just deliver this message?
470 // There is a similar check in the route function, there it should be okay.
471 if (destination == nodeId)
472 {
473 logging_warn("Sent message to myself. Drop!");
474
475 throw message_not_sent("Sent message to myself. Drop!");
476 }
477
478 // use relay path?
479 if (message->isRelayed())
480 {
481 next_link = getRelayLinkTo( destination );
482
483 if (next_link != NULL)
484 {
485 next_link->setRelaying();
486
487 // * send message over relayed link *
488 return send_overlaymessage_down(message, next_link->communicationId, priority);
489 }
490 else
491 {
492 logging_warn("No relay hop found to " << destination
493 << " -- trying to route over overlay paths ...")
494 }
495 }
496
497
498 // last resort -> route over overlay path
499 LinkID next_id = overlayInterface->getNextLinkId( destination );
500 if ( next_id.isUnspecified() )
501 {
502 // apperently we're the closest node --> try second best node
503 // NOTE: This is helpful if our routing table is not up-to-date, but
504 // may lead to circles. So we have to be careful.
505 std::vector<const LinkID*> next_ids =
506 overlayInterface->getSortedLinkIdsTowardsNode( destination );
507
508 for ( int i = 0; i < next_ids.size(); i++ )
509 {
510 const LinkID& link = *next_ids[i];
511
512 if ( ! link.isUnspecified() )
513 {
514 next_id = link;
515
516 break;
517 }
518 }
519
520 // still no next hop found. drop.
521 if ( next_id.isUnspecified() )
522 {
523 logging_warn("Could not send message. No next hop found to " <<
524 destination );
525 logging_error("ERROR: " << debugInformation() );
526
527 throw message_not_sent("No next hop found.");
528 }
529 }
530
531
532 /* get link descriptor, do some checks and send message */
533 next_link = getDescriptor(next_id);
534
535 // check pointer
536 if ( next_link == NULL )
537 {
538 // NOTE: this shuldn't happen
539 throw message_not_sent("Could not send message. Link not known.");
540 }
541
542 // avoid circles
543 if ( next_link->remoteNode == last_hop )
544 {
545 // XXX logging_debug
546 logging_info("Possible next hop would create a circle: "
547 << next_link->remoteNode);
548
549 throw message_not_sent("Could not send message. Possible next hop would create a circle.");
550 }
551
552 // check if link is up
553 if ( ! next_link->up)
554 {
555 logging_warn("Could not send message. Link not up");
556 logging_error("ERROR: " << debugInformation() );
557
558 throw message_not_sent("Could not send message. Link not up");
559 }
560
561 // * send message over overlay link *
562 return send(message, next_link, priority);
563}
564
565
566/// send a message using a link descriptor, delivers it to the base overlay class
567seqnum_t BaseOverlay::send( OverlayMsg* message,
568 LinkDescriptor* ldr,
569 uint8_t priority ) throw(message_not_sent)
570{
571 // check if null
572 if (ldr == NULL)
573 {
574 ostringstream out;
575 out << "Can not send message to " << message->getDestinationAddress();
576 throw message_not_sent(out.str());
577 }
578
579 // check if up
580 if ( !ldr->up )
581 {
582 logging_error("DEBUG_INFO: " << debugInformation() );
583
584 ostringstream out;
585 out << "Can not send message. Link not up:" << ldr->to_string();
586 throw message_not_sent(out.str());
587 }
588
589 LinkDescriptor* next_hop_ld = NULL;
590
591 // BRANCH: relayed link
592 if (ldr->relayed)
593 {
594 logging_debug("Resolving direct link for relayed link to "
595 << ldr->remoteNode);
596
597 next_hop_ld = getRelayLinkTo( ldr->remoteNode );
598
599 if (next_hop_ld==NULL)
600 {
601 logging_error("DEBUG_INFO: " << debugInformation() );
602
603 ostringstream out;
604 out << "No relay path found to link: " << ldr;
605 throw message_not_sent(out.str());
606 }
607
608 next_hop_ld->setRelaying();
609 message->setRelayed(true);
610 }
611 // BRANCH: direct link
612 else
613 {
614 next_hop_ld = ldr;
615 }
616
617
618 // check next hop-link
619 if ( ! next_hop_ld->communicationUp)
620 {
621 throw message_not_sent( "send(): Could not send message."
622 " Not a relayed link and direct link is not up." );
623 }
624
625 // send over next link
626 logging_debug("send(): Sending message over direct link.");
627 return send_overlaymessage_down(message, next_hop_ld->communicationId, priority);
628
629}
630
631seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote,
632 uint8_t priority, const ServiceID& service) throw(message_not_sent)
633{
634 message->setSourceNode(nodeId);
635 message->setDestinationNode(remote);
636 message->setService(service);
637 return send( message, remote, priority );
638}
639
640void BaseOverlay::send_link( OverlayMsg* message,
641 const LinkID& link,
642 uint8_t priority ) throw(message_not_sent)
643{
644 LinkDescriptor* ld = getDescriptor(link);
645 if (ld==NULL)
646 {
647 throw message_not_sent("Cannot find descriptor to link id=" + link.toString());
648 }
649
650 message->setSourceNode(nodeId);
651 message->setDestinationNode(ld->remoteNode);
652
653 message->setSourceLink(ld->overlayId);
654 message->setDestinationLink(ld->remoteLink);
655
656 message->setService(ld->service);
657 message->setRelayed(ld->relayed);
658
659
660 try
661 {
662 // * send message *
663 send( message, ld, priority );
664 }
665 catch ( message_not_sent& e )
666 {
667 // drop failed link
668 ld->failed = true;
669 dropLink(ld->overlayId);
670 }
671}
672
673// relay route management ------------------------------------------------------
674
675/// stabilize relay information
676void BaseOverlay::stabilizeRelays() {
677 vector<relay_route>::iterator i = relay_routes.begin();
678 while (i!=relay_routes.end() ) {
679 relay_route& route = *i;
680 LinkDescriptor* ld = getDescriptor(route.link);
681
682 // relay link still used and alive?
683 if (ld==NULL
684 || !isLinkDirectVital(ld)
685 || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT) // TODO this was set to 8 before.. Is the new timeout better?
686 {
687 logging_info("Forgetting relay information to node "
688 << route.node.toString() );
689 i = relay_routes.erase(i);
690 } else
691 i++;
692 }
693}
694
695void BaseOverlay::removeRelayLink( const LinkID& link ) {
696 vector<relay_route>::iterator i = relay_routes.begin();
697 while (i!=relay_routes.end() ) {
698 relay_route& route = *i;
699 if (route.link == link ) i = relay_routes.erase(i); else i++;
700 }
701}
702
703void BaseOverlay::removeRelayNode( const NodeID& remote ) {
704 vector<relay_route>::iterator i = relay_routes.begin();
705 while (i!=relay_routes.end() ) {
706 relay_route& route = *i;
707 if (route.node == remote ) i = relay_routes.erase(i); else i++;
708 }
709}
710
711/// refreshes relay information
712void BaseOverlay::refreshRelayInformation( const OverlayMsg* message, LinkDescriptor* ld ) {
713
714 // handle relayed messages from real links only
715 if (ld == NULL
716 || ld->relayed
717 || message->getSourceNode()==nodeId ) return;
718
719 // update usage information
720 if (message->isRelayed()) {
721 // try to find source node
722 foreach( relay_route& route, relay_routes ) {
723 // relay route found? yes->
724 if ( route.node == message->getDestinationNode() ) {
725 ld->setRelaying();
726 route.used = time(NULL);
727 }
728 }
729
730 }
731
732 // register relay path
733 if (message->isRegisterRelay()) {
734 // set relaying
735 ld->setRelaying();
736
737 // try to find source node
738 foreach( relay_route& route, relay_routes ) {
739
740 // relay route found? yes->
741 if ( route.node == message->getSourceNode() ) {
742
743 // refresh timer
744 route.used = time(NULL);
745 LinkDescriptor* rld = getDescriptor(route.link);
746
747 // route has a shorter hop count or old link is dead? yes-> replace
748 if (route.hops > message->getNumHops()
749 || rld == NULL
750 || !isLinkDirectVital(ld)) {
751 logging_info("Updating relay information to node "
752 << route.node.toString()
753 << " reducing to " << (int) message->getNumHops() << " hops.");
754 route.hops = message->getNumHops();
755 route.link = ld->overlayId;
756 }
757 return;
758 }
759 }
760
761 // not found-> add new entry
762 relay_route route;
763 route.hops = message->getNumHops();
764 route.link = ld->overlayId;
765 route.node = message->getSourceNode();
766 route.used = time(NULL);
767 logging_info("Remembering relay information to node "
768 << route.node.toString());
769 relay_routes.push_back(route);
770 }
771}
772
773/// returns a known "vital" relay link which is up and running
774LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) {
775 // try to find source node
776 foreach( relay_route& route, relay_routes ) {
777 if (route.node == remote ) {
778 LinkDescriptor* ld = getDescriptor( route.link );
779 if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else {
780 route.used = time(NULL);
781 return ld;
782 }
783 }
784 }
785 return NULL;
786}
787
788/* *****************************************************************************
789 * PUBLIC MEMBERS
790 * ****************************************************************************/
791
792use_logging_cpp(BaseOverlay);
793
794// ----------------------------------------------------------------------------
795
796BaseOverlay::BaseOverlay() :
797 started(false),state(BaseOverlayStateInvalid),
798 bc(NULL),
799 nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED),
800 sideport(&SideportListener::DEFAULT), overlayInterface(NULL),
801 counter(0) {
802}
803
804BaseOverlay::~BaseOverlay() {
805}
806
807// ----------------------------------------------------------------------------
808
809void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) {
810 logging_info("Starting...");
811
812 // set parameters
813 bc = _basecomm;
814 nodeId = _nodeid;
815
816 // register at base communication
817 bc->registerMessageReceiver( this );
818 bc->registerEventListener( this );
819
820 // timer for auto link management
821 Timer::setInterval( 1000 ); // XXX
822// Timer::setInterval( 10000 );
823 Timer::start();
824
825 started = true;
826 state = BaseOverlayStateInvalid;
827}
828
829void BaseOverlay::stop() {
830 logging_info("Stopping...");
831
832 // stop timer
833 Timer::stop();
834
835 // delete oberlay interface
836 if(overlayInterface != NULL) {
837 delete overlayInterface;
838 overlayInterface = NULL;
839 }
840
841 // unregister at base communication
842 bc->unregisterMessageReceiver( this );
843 bc->unregisterEventListener( this );
844
845 started = false;
846 state = BaseOverlayStateInvalid;
847}
848
849bool BaseOverlay::isStarted(){
850 return started;
851}
852
853// ----------------------------------------------------------------------------
854
855void BaseOverlay::joinSpoVNet(const SpoVNetID& id,
856 const EndpointDescriptor& bootstrapEp) {
857
858 if(id != spovnetId){
859 logging_error("attempt to join against invalid spovnet, call initiate first");
860 return;
861 }
862
863 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
864 logging_info( "Starting to join spovnet " << id.toString() <<
865 " with nodeid " << nodeId.toString());
866
867 if(bootstrapEp.isUnspecified() && state == BaseOverlayStateInvalid){
868
869 //** FIRST STEP - MANDATORY */
870
871 // bootstrap against ourselfs
872 logging_info("joining spovnet locally");
873
874 overlayInterface->joinOverlay();
875 state = BaseOverlayStateCompleted;
876 foreach( NodeListener* i, nodeListeners )
877 i->onJoinCompleted( spovnetId );
878
879 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
880 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN );
881
882 } else {
883
884 //** SECOND STEP - OPTIONAL */
885
886 // bootstrap against another node
887 logging_info("joining spovnet remotely against " << bootstrapEp.toString());
888
889 const LinkID& lnk = bc->establishLink( bootstrapEp );
890 bootstrapLinks.push_back(lnk);
891 logging_info("join process initiated for " << id.toString() << "...");
892 }
893}
894
895
896void BaseOverlay::startBootstrapModules(vector<pair<BootstrapManager::BootstrapType,string> > modules){
897 logging_debug("starting overlay bootstrap module");
898 overlayBootstrap.start(this, spovnetId, nodeId, modules);
899 overlayBootstrap.publish(bc->getEndpointDescriptor());
900}
901
902void BaseOverlay::stopBootstrapModules(){
903 logging_debug("stopping overlay bootstrap module");
904 overlayBootstrap.stop();
905 overlayBootstrap.revoke();
906}
907
908void BaseOverlay::leaveSpoVNet() {
909
910 logging_info( "Leaving spovnet " << spovnetId );
911 bool ret = ( state != this->BaseOverlayStateInvalid );
912
913 logging_debug( "Dropping all auto-links" );
914
915 // gather all service links
916 vector<LinkID> servicelinks;
917 foreach( LinkDescriptor* ld, links )
918 {
919 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID )
920 servicelinks.push_back( ld->overlayId );
921 }
922
923 // drop all service links
924 foreach( LinkID lnk, servicelinks )
925 {
926 logging_debug("Dropping service link " << lnk.toString());
927 dropLink( lnk );
928 }
929
930 // let the node leave the spovnet overlay interface
931 logging_debug( "Leaving overlay" );
932 if( overlayInterface != NULL )
933 {
934 overlayInterface->leaveOverlay();
935 }
936
937 // drop still open bootstrap links
938 foreach( LinkID lnk, bootstrapLinks )
939 {
940 logging_debug("Dropping bootstrap link " << lnk.toString());
941 bc->dropLink( lnk );
942 }
943
944 // change to inalid state
945 state = BaseOverlayStateInvalid;
946 //ovl.visShutdown( ovlId, nodeId, string("") );
947
948 visualInstance.visShutdown(visualIdOverlay, nodeId, "");
949 visualInstance.visShutdown(visualIdBase, nodeId, "");
950
951 // inform all registered services of the event
952 foreach( NodeListener* i, nodeListeners )
953 {
954 if( ret ) i->onLeaveCompleted( spovnetId );
955 else i->onLeaveFailed( spovnetId );
956 }
957}
958
959void BaseOverlay::createSpoVNet(const SpoVNetID& id,
960 const OverlayParameterSet& param,
961 const SecurityParameterSet& sec,
962 const QoSParameterSet& qos) {
963
964 // set the state that we are an initiator, this way incoming messages are
965 // handled correctly
966 logging_info( "creating spovnet " + id.toString() <<
967 " with nodeid " << nodeId.toString() );
968
969 spovnetId = id;
970
971 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
972 if( overlayInterface == NULL ) {
973 logging_fatal( "overlay structure not supported" );
974 state = BaseOverlayStateInvalid;
975
976 foreach( NodeListener* i, nodeListeners )
977 i->onJoinFailed( spovnetId );
978
979 return;
980 }
981
982 visualInstance.visCreate(visualIdBase, nodeId, "", "");
983 visualInstance.visCreate(visualIdOverlay, nodeId, "", "");
984}
985
986// ----------------------------------------------------------------------------
987
988const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp,
989 const NodeID& remoteId, const ServiceID& service ) {
990
991 // establish link via overlay
992 if (!remoteId.isUnspecified())
993 return establishLink( remoteId, service );
994 else
995 return establishDirectLink(remoteEp, service );
996}
997
998/// call base communication's establish link and add link mapping
999const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep,
1000 const ServiceID& service ) {
1001
1002 /// find a service listener
1003 if( !communicationListeners.contains( service ) ) {
1004 logging_error( "No listener registered for service id=" << service.toString() );
1005 return LinkID::UNSPECIFIED;
1006 }
1007 CommunicationListener* listener = communicationListeners.get( service );
1008 assert( listener != NULL );
1009
1010 // create descriptor
1011 LinkDescriptor* ld = addDescriptor();
1012 ld->relayed = false;
1013 ld->listener = listener;
1014 ld->service = service;
1015 ld->communicationId = bc->establishLink( ep );
1016
1017 /// establish link and add mapping
1018 logging_info("Establishing direct link " << ld->communicationId.toString()
1019 << " using " << ep.toString());
1020
1021 return ld->communicationId;
1022}
1023
1024/// establishes a link between two arbitrary nodes
1025const LinkID BaseOverlay::establishLink( const NodeID& remote,
1026 const ServiceID& service ) {
1027
1028 // TODO What if we already have a Link to this node and this service id?
1029
1030 // do not establish a link to myself!
1031 if (remote == nodeId) return
1032 LinkID::UNSPECIFIED;
1033
1034
1035 // create a link descriptor
1036 LinkDescriptor* ld = addDescriptor();
1037 ld->relayed = true;
1038 ld->remoteNode = remote;
1039 ld->service = service;
1040 ld->listener = getListener(ld->service);
1041
1042 // initialize sequence numbers
1043 ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
1044 logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum);
1045
1046 // create link request message
1047 OverlayMsg msg(OverlayMsg::typeLinkRequest, service, nodeId, remote );
1048 msg.setSourceLink(ld->overlayId);
1049
1050 // send over relayed link
1051 msg.setRelayed(true);
1052 msg.setRegisterRelay(true);
1053// msg.setRouteRecord(true);
1054
1055 msg.setSeqNum(ld->last_sent_seqnum);
1056
1057 // debug message
1058 logging_info(
1059 "Sending link request with"
1060 << " link=" << ld->overlayId.toString()
1061 << " node=" << ld->remoteNode.toString()
1062 << " serv=" << ld->service.toString()
1063 );
1064
1065
1066 // sending message to node
1067 try
1068 {
1069 // * send *
1070 seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service );
1071 }
1072 catch ( message_not_sent& e )
1073 {
1074 logging_warn("Link request not sent: " << e.what());
1075
1076 // Message not sent. Cancel link request.
1077 SystemQueue::instance().scheduleCall(
1078 boost::bind(
1079 &BaseOverlay::__onLinkEstablishmentFailed,
1080 this,
1081 ld->overlayId)
1082 );
1083 }
1084
1085 return ld->overlayId;
1086}
1087
1088/// NOTE: "id" is an Overlay-LinkID
1089void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id)
1090{
1091 // TODO This code redundant. But also it's not easy to aggregate in one function.
1092
1093 // get descriptor for link
1094 LinkDescriptor* ld = getDescriptor(id, false);
1095 if ( ld == NULL ) return; // not found? ->ignore!
1096
1097 logging_debug( "__onLinkEstablishmentFaild: " << ld );
1098
1099 // removing relay link information
1100 removeRelayLink(ld->overlayId);
1101
1102 // inform listeners about link down
1103 ld->communicationUp = false;
1104 if (!ld->service.isUnspecified())
1105 {
1106 CommunicationListener* lst = getListener(ld->service);
1107 if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode );
1108 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1109 }
1110
1111 // delete all queued messages (auto links)
1112 if( ld->messageQueue.size() > 0 ) {
1113 logging_warn( "Dropping link " << id.toString() << " that has "
1114 << ld->messageQueue.size() << " waiting messages" );
1115 ld->flushQueue();
1116 }
1117
1118 // erase mapping
1119 eraseDescriptor(ld->overlayId);
1120}
1121
1122
1123/// drops an established link
1124void BaseOverlay::dropLink(const LinkID& link)
1125{
1126 logging_info( "Dropping link: " << link.toString() );
1127
1128 // find the link item to drop
1129 LinkDescriptor* ld = getDescriptor(link);
1130 if( ld == NULL )
1131 {
1132 logging_warn( "Can't drop link, link is unknown!");
1133 return;
1134 }
1135
1136 // delete all queued messages
1137 if( ld->messageQueue.size() > 0 )
1138 {
1139 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has "
1140 << ld->messageQueue.size() << " waiting messages" );
1141 ld->flushQueue();
1142 }
1143
1144
1145 // inform application and remote note (but only once)
1146 // NOTE: If we initiated the drop, this function is called twice, but on
1147 // the second call, there is noting to do.
1148 if ( ld->up && ! ld->failed )
1149 {
1150 // inform sideport and listener
1151 if(ld->listener != NULL)
1152 {
1153 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode );
1154 }
1155 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId );
1156
1157 // send link-close to remote node
1158 logging_info("Sending LinkClose message to remote node.");
1159 OverlayMsg close_msg(OverlayMsg::typeLinkClose);
1160 send_link(&close_msg, link, system_priority::OVERLAY);
1161
1162 // deactivate link
1163 ld->up = false;
1164// ld->closing = true;
1165 }
1166
1167 else if ( ld->failed )
1168 {
1169 // inform listener
1170 if( ld->listener != NULL )
1171 {
1172 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1173 }
1174
1175 ld->up = false;
1176 __removeDroppedLink(ld->overlayId);
1177 }
1178}
1179
1180/// called from typeLinkClose-handler
1181void BaseOverlay::__removeDroppedLink(const LinkID& link)
1182{
1183 // find the link item to drop
1184 LinkDescriptor* ld = getDescriptor(link);
1185 if( ld == NULL )
1186 {
1187 return;
1188 }
1189
1190 // do not drop relay links
1191 if (!ld->relaying)
1192 {
1193 // drop the link in base communication
1194 if (ld->communicationUp)
1195 {
1196 bc->dropLink( ld->communicationId );
1197 }
1198
1199 // erase descriptor
1200 eraseDescriptor( ld->overlayId );
1201 }
1202 else
1203 {
1204 ld->dropAfterRelaying = true;
1205 }
1206}
1207
1208// ----------------------------------------------------------------------------
1209
1210/// internal send message, always use this functions to send messages over links
1211const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message,
1212 const LinkID& link,
1213 uint8_t priority ) throw(message_not_sent)
1214{
1215 logging_debug( "Sending data message on link " << link.toString() );
1216
1217 // get the mapping for this link
1218 LinkDescriptor* ld = getDescriptor(link);
1219 if( ld == NULL )
1220 {
1221 throw message_not_sent("Could not send message. Link not found id=" + link.toString());
1222 }
1223
1224 // check if the link is up yet, if its an auto link queue message
1225 if( !ld->up )
1226 {
1227 ld->setAutoUsed();
1228 if( ld->autolink )
1229 {
1230 logging_info("Auto-link " << link.toString() << " not up, queue message");
1231
1232 // queue message
1233 LinkDescriptor::message_queue_entry msg;
1234 msg.message = message;
1235 msg.priority = priority;
1236
1237 ld->messageQueue.push_back( msg );
1238
1239 return SequenceNumber::DISABLED; // TODO what to return if message is queued?
1240 }
1241 else
1242 {
1243 throw message_not_sent("Link " + link.toString() + " not up, drop message");
1244 }
1245 }
1246
1247 // TODO AKTUELL: sequence numbers
1248 // TODO seqnum on fast path ?
1249 ld->last_sent_seqnum.increment();
1250
1251 /* choose fast-path for direct links; normal overlay-path otherwise */
1252 // BRANCH: direct link
1253 if ( ld->communicationUp && !ld->relayed )
1254 {
1255 // * send down to BaseCommunication *
1256 try
1257 {
1258 bc->sendMessage(ld->communicationId, message, priority, true);
1259 }
1260 catch ( communication::communication_message_not_sent& e )
1261 {
1262 ostringstream out;
1263 out << "Communication message on fast-path not sent: " << e.what();
1264 throw message_not_sent(out.str());
1265 }
1266 }
1267
1268 // BRANCH: use (slow) overlay-path
1269 else
1270 {
1271 // compile overlay message (has service and node id)
1272 OverlayMsg overmsg( OverlayMsg::typeData );
1273 overmsg.set_payload_message(message);
1274
1275 // set SeqNum
1276 if ( ld->transmit_seqnums )
1277 {
1278 overmsg.setSeqNum(ld->last_sent_seqnum);
1279 }
1280 logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum());
1281
1282 // send message over relay/direct/overlay
1283 send_link( &overmsg, ld->overlayId, priority );
1284 }
1285
1286 // return seqnum
1287 return ld->last_sent_seqnum;
1288}
1289
1290
1291const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message,
1292 const NodeID& node, uint8_t priority, const ServiceID& service) {
1293
1294 // find link for node and service
1295 LinkDescriptor* ld = getAutoDescriptor( node, service );
1296
1297 // if we found no link, create an auto link
1298 if( ld == NULL ) {
1299
1300 // debug output
1301 logging_info( "No link to send message to node "
1302 << node.toString() << " found for service "
1303 << service.toString() << ". Creating auto link ..."
1304 );
1305
1306 // call base overlay to create a link
1307 LinkID link = establishLink( node, service );
1308 ld = getDescriptor( link );
1309 if( ld == NULL ) {
1310 logging_error( "Failed to establish auto-link.");
1311 throw message_not_sent("Failed to establish auto-link.");
1312 }
1313 ld->autolink = true;
1314
1315 logging_debug( "Auto-link establishment in progress to node "
1316 << node.toString() << " with link id=" << link.toString() );
1317 }
1318 assert(ld != NULL);
1319
1320 // mark the link as used, as we now send a message through it
1321 ld->setAutoUsed();
1322
1323 // send / queue message
1324 return sendMessage( message, ld->overlayId, priority );
1325}
1326
1327
1328NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message,
1329 const NodeID& address, uint8_t priority, const ServiceID& service) {
1330
1331 if ( overlayInterface->isClosestNodeTo(address) )
1332 {
1333 return NodeID::UNSPECIFIED;
1334 }
1335
1336 const NodeID& closest_node = overlayInterface->getNextNodeId(address);
1337
1338 if ( closest_node != NodeID::UNSPECIFIED )
1339 {
1340 sendMessage(message, closest_node, priority, service);
1341 }
1342
1343 return closest_node; // return seqnum ?? tuple? closest_node via (non const) reference?
1344}
1345// ----------------------------------------------------------------------------
1346
1347const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1348 const LinkID& link) const {
1349
1350 // return own end-point descriptor
1351 if( link.isUnspecified() )
1352 return bc->getEndpointDescriptor();
1353
1354 // find link descriptor. not found -> return unspecified
1355 const LinkDescriptor* ld = getDescriptor(link);
1356 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED();
1357
1358 // return endpoint-descriptor from base communication
1359 return bc->getEndpointDescriptor( ld->communicationId );
1360}
1361
1362const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(
1363 const NodeID& node) const {
1364
1365 // return own end-point descriptor
1366 if( node == nodeId || node.isUnspecified() ) {
1367 //logging_info("getEndpointDescriptor: returning self.");
1368 return bc->getEndpointDescriptor();
1369 }
1370
1371 // no joined and request remote descriptor? -> fail!
1372 if( overlayInterface == NULL ) {
1373 logging_error( "Overlay interface not set, cannot resolve end-point." );
1374 return EndpointDescriptor::UNSPECIFIED();
1375 }
1376
1377// // resolve end-point descriptor from the base-overlay routing table
1378// const EndpointDescriptor& ep = overlayInterface->resolveNode( node );
1379// if(ep.toString() != "") return ep;
1380
1381 // see if we can find the node in our own table
1382 foreach(const LinkDescriptor* ld, links){
1383 if(ld->remoteNode != node) continue;
1384 if(!ld->communicationUp) continue;
1385 const EndpointDescriptor& ep =
1386 bc->getEndpointDescriptor(ld->communicationId);
1387 if(ep != EndpointDescriptor::UNSPECIFIED()) {
1388 //logging_info("getEndpointDescriptor: using " << ld->to_string());
1389 return ep;
1390 }
1391 }
1392
1393 logging_warn( "No EndpointDescriptor found for node " << node );
1394 logging_warn( const_cast<BaseOverlay*>(this)->debugInformation() );
1395
1396 return EndpointDescriptor::UNSPECIFIED();
1397}
1398
1399// ----------------------------------------------------------------------------
1400
1401bool BaseOverlay::registerSidePort(SideportListener* _sideport) {
1402 sideport = _sideport;
1403 _sideport->configure( this );
1404 return true;
1405}
1406
1407bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) {
1408 sideport = &SideportListener::DEFAULT;
1409 return true;
1410}
1411
1412// ----------------------------------------------------------------------------
1413
1414bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) {
1415 logging_debug( "binding communication listener " << listener
1416 << " on serviceid " << sid.toString() );
1417
1418 if( communicationListeners.contains( sid ) ) {
1419 logging_error( "some listener already registered for service id "
1420 << sid.toString() );
1421 return false;
1422 }
1423
1424 communicationListeners.registerItem( listener, sid );
1425 return true;
1426}
1427
1428
1429bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) {
1430 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() );
1431
1432 if( !communicationListeners.contains( sid ) ) {
1433 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
1434 return false;
1435 }
1436
1437 if( communicationListeners.get(sid) != listener ) {
1438 logging_warn( "listener bound to service id " << sid.toString()
1439 << " is different than listener trying to unbind" );
1440 return false;
1441 }
1442
1443 communicationListeners.unregisterItem( sid );
1444 return true;
1445}
1446
1447// ----------------------------------------------------------------------------
1448
1449bool BaseOverlay::bind(NodeListener* listener) {
1450 logging_debug( "Binding node listener " << listener );
1451
1452 // already bound? yes-> warning
1453 NodeListenerVector::iterator i =
1454 find( nodeListeners.begin(), nodeListeners.end(), listener );
1455 if( i != nodeListeners.end() ) {
1456 logging_warn("Node listener " << listener << " is already bound!" );
1457 return false;
1458 }
1459
1460 // no-> add
1461 nodeListeners.push_back( listener );
1462 return true;
1463}
1464
1465bool BaseOverlay::unbind(NodeListener* listener) {
1466 logging_debug( "Unbinding node listener " << listener );
1467
1468 // already unbound? yes-> warning
1469 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
1470 if( i == nodeListeners.end() ) {
1471 logging_warn( "Node listener " << listener << " is not bound!" );
1472 return false;
1473 }
1474
1475 // no-> remove
1476 nodeListeners.erase( i );
1477 return true;
1478}
1479
1480// ----------------------------------------------------------------------------
1481
1482void BaseOverlay::onLinkUp(const LinkID& id,
1483 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
1484{
1485 logging_debug( "Link up with base communication link id=" << id );
1486
1487 // get descriptor for link
1488 LinkDescriptor* ld = getDescriptor(id, true);
1489
1490 // BRANCH: handle bootstrap link we initiated
1491 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){
1492 logging_info(
1493 "Join has been initiated by me and the link is now up. " <<
1494 "LinkID: " << id.toString() <<
1495 "Sending out join request for SpoVNet " << spovnetId.toString()
1496 );
1497
1498 // send join request message
1499 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest,
1500 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1501 JoinRequest joinRequest( spovnetId, nodeId );
1502 overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer());
1503
1504 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
1505
1506 return;
1507 }
1508
1509 // BRANCH: link establishment from remote, add one!
1510 if (ld == NULL) {
1511 ld = addDescriptor( id );
1512 logging_info( "onLinkUp (remote request) descriptor: " << ld );
1513
1514 // update descriptor
1515 ld->fromRemote = true;
1516 ld->communicationId = id;
1517 ld->communicationUp = true;
1518 ld->setAutoUsed();
1519 ld->setAlive();
1520
1521 // in this case, do not inform listener, since service it unknown
1522 // -> wait for update message!
1523 }
1524
1525 // BRANCH: We requested this link in the first place
1526 else
1527 {
1528 logging_info( "onLinkUp descriptor (initiated locally):" << ld );
1529
1530 // update descriptor
1531 ld->setAutoUsed();
1532 ld->setAlive();
1533 ld->communicationUp = true;
1534 ld->fromRemote = false;
1535
1536 // BRANCH: this was a relayed link before --> convert to direct link
1537 // TODO do we really have to send a message here?
1538 if (ld->relayed)
1539 {
1540 ld->up = true;
1541 ld->relayed = false;
1542 logging_info( "Converting to direct link: " << ld );
1543
1544 // send message
1545 OverlayMsg overMsg( OverlayMsg::typeLinkDirect );
1546 overMsg.setSourceLink( ld->overlayId );
1547 overMsg.setDestinationLink( ld->remoteLink );
1548 send_link( &overMsg, ld->overlayId, system_priority::OVERLAY );
1549
1550 // inform listener
1551 if( ld->listener != NULL)
1552 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1553 }
1554
1555
1556 /* NOTE: Chord is opening direct-links in it's setup routine which are
1557 * neither set to "relayed" nor to "up". To activate these links a
1558 * typeLinkUpdate must be sent.
1559 *
1560 * This branch is would also be taken when we had a working link before
1561 * (ld->up == true). I'm not sure if this case does actually happen
1562 * and whether it's tested.
1563 */
1564 else
1565 {
1566 // note: necessary to validate the link on the remote side!
1567 logging_info( "Sending out update" <<
1568 " for service " << ld->service.toString() <<
1569 " with local node id " << nodeId.toString() <<
1570 " on link " << ld->overlayId.toString() );
1571
1572 // compile and send update message
1573 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate );
1574 overlayMsg.setAutoLink( ld->autolink );
1575 overlayMsg.setSourceNode(nodeId);
1576 overlayMsg.setDestinationNode(ld->remoteNode);
1577 overlayMsg.setSourceLink(ld->overlayId);
1578 overlayMsg.setDestinationLink(ld->remoteLink);
1579 overlayMsg.setService(ld->service);
1580 overlayMsg.setRelayed(false);
1581
1582 // TODO ld->communicationId = id ??
1583
1584 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY);
1585 }
1586 }
1587}
1588
1589void BaseOverlay::onLinkDown(const LinkID& id,
1590 const addressing2::EndpointPtr local,
1591 const addressing2::EndpointPtr remote)
1592{
1593 // erase bootstrap links
1594 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1595 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1596
1597 // get descriptor for link
1598 LinkDescriptor* ld = getDescriptor(id, true);
1599 if ( ld == NULL ) return; // not found? ->ignore!
1600 logging_info( "onLinkDown descriptor: " << ld );
1601
1602 // removing relay link information
1603 removeRelayLink(ld->overlayId);
1604
1605 // inform listeners about link down
1606 ld->communicationUp = false;
1607 if (!ld->service.isUnspecified()) {
1608 CommunicationListener* lst = getListener(ld->service);
1609 if(lst != NULL) lst->onLinkDown( ld->overlayId, ld->remoteNode );
1610 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId );
1611 }
1612
1613 // delete all queued messages (auto links)
1614 if( ld->messageQueue.size() > 0 ) {
1615 logging_warn( "Dropping link " << id.toString() << " that has "
1616 << ld->messageQueue.size() << " waiting messages" );
1617 ld->flushQueue();
1618 }
1619
1620 // erase mapping
1621 eraseDescriptor(ld->overlayId);
1622}
1623
1624
1625void BaseOverlay::onLinkFail(const LinkID& id,
1626 const addressing2::EndpointPtr local,
1627 const addressing2::EndpointPtr remote)
1628{
1629 logging_debug( "Link fail with base communication link id=" << id );
1630
1631// // erase bootstrap links
1632// vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id );
1633// if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it );
1634//
1635// // get descriptor for link
1636// LinkDescriptor* ld = getDescriptor(id, true);
1637// if ( ld == NULL ) return; // not found? ->ignore!
1638// logging_debug( "Link failed id=" << ld->overlayId.toString() );
1639//
1640// // inform listeners
1641// ld->listener->onLinkFail( ld->overlayId, ld->remoteNode );
1642// sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId );
1643
1644 logging_debug( " ... calling onLinkDown ..." );
1645 onLinkDown(id, local, remote);
1646}
1647
1648
1649void BaseOverlay::onLinkChanged(const LinkID& id,
1650 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal,
1651 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote)
1652{
1653 // get descriptor for link
1654 LinkDescriptor* ld = getDescriptor(id, true);
1655 if ( ld == NULL ) return; // not found? ->ignore!
1656 logging_debug( "onLinkChanged descriptor: " << ld );
1657
1658 // inform listeners
1659 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode );
1660 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId );
1661
1662 // autolinks: refresh timestamp
1663 ld->setAutoUsed();
1664}
1665
1666//void BaseOverlay::onLinkQoSChanged(const LinkID& id,
1667// const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
1668// const QoSParameterSet& qos)
1669//{
1670// logging_debug( "Link quality changed with base communication link id=" << id );
1671//
1672// // get descriptor for link
1673// LinkDescriptor* ld = getDescriptor(id, true);
1674// if ( ld == NULL ) return; // not found? ->ignore!
1675// logging_debug( "Link quality changed id=" << ld->overlayId.toString() );
1676//}
1677
1678bool BaseOverlay::onLinkRequest(const LinkID& id,
1679 const addressing2::EndpointPtr local,
1680 const addressing2::EndpointPtr remote)
1681{
1682 logging_debug("Accepting link request from " << remote->to_string() );
1683
1684 // TODO ask application..?
1685
1686 return true;
1687}
1688
1689
1690
1691
1692/// handles a message from base communication
1693bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message,
1694 const LinkID& link,
1695 const NodeID&,
1696 bool bypass_overlay )
1697{
1698 // get descriptor for link
1699 LinkDescriptor* ld = getDescriptor( link, true );
1700
1701
1702 /* choose fastpath for direct links; normal overlay-path otherwise */
1703 if ( bypass_overlay && ld )
1704 {
1705 // message received --> link is alive
1706 ld->keepAliveReceived = time(NULL);
1707 // hop count on this link
1708 ld->hops = 0;
1709
1710
1711 // hand over to CommunicationListener (aka Application)
1712 CommunicationListener* lst = getListener(ld->service);
1713 if ( lst != NULL )
1714 {
1715 lst->onMessage(
1716 message,
1717 ld->remoteNode,
1718 ld->overlayId,
1719 SequenceNumber::DISABLED,
1720 NULL );
1721
1722 return true;
1723 }
1724
1725 return false;
1726 }
1727 else
1728 {
1729 return handleMessage( message, ld, link );
1730 }
1731}
1732
1733// ----------------------------------------------------------------------------
1734
1735/// Handle spovnet instance join requests
1736bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink )
1737{
1738 // decapsulate message
1739 JoinRequest joinReq;
1740 joinReq.deserialize_from_shared_buffer(message);
1741
1742 logging_info( "Received join request for spovnet " <<
1743 joinReq.getSpoVNetID().toString() );
1744
1745 // check spovnet id
1746 if( joinReq.getSpoVNetID() != spovnetId ) {
1747 logging_error(
1748 "Received join request for spovnet we don't handle " <<
1749 joinReq.getSpoVNetID().toString() );
1750
1751 return false;
1752 }
1753
1754 // TODO: here you can implement mechanisms to deny joining of a node
1755 bool allow = true;
1756 logging_info( "Sending join reply for spovnet " <<
1757 spovnetId.toString() << " to node " <<
1758 source.toString() <<
1759 ". Result: " << (allow ? "allowed" : "denied") );
1760 joiningNodes.push_back( source );
1761
1762 // return overlay parameters
1763 assert( overlayInterface != NULL );
1764 logging_debug( "Using bootstrap end-point "
1765 << getEndpointDescriptor().toString() )
1766 OverlayParameterSet parameters = overlayInterface->getParameters();
1767
1768
1769 // create JoinReplay Message
1770 OverlayMsg retmsg( OverlayMsg::typeJoinReply,
1771 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );
1772 JoinReply replyMsg( spovnetId, parameters, allow );
1773 retmsg.append_buffer(replyMsg.serialize_into_shared_buffer());
1774
1775 // XXX This is unlovely clash between the old message system and the new one,
1776 // but a.t.m. we can't migrate everything to the new system at once..
1777 // ---> Consider the EndpointDescriptor as part of the JoinReply..
1778 retmsg.append_buffer(getEndpointDescriptor().serialize());
1779
1780 // * send *
1781 send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY);
1782
1783 return true;
1784}
1785
1786/// Handle replies to spovnet instance join requests
1787bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink )
1788{
1789 // decapsulate message
1790 logging_debug("received join reply message");
1791 JoinReply replyMsg;
1792 EndpointDescriptor endpoints;
1793 reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message);
1794 buff = endpoints.deserialize(buff);
1795
1796 // correct spovnet?
1797 if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail
1798 logging_error( "Received SpoVNet join reply for " <<
1799 replyMsg.getSpoVNetID().toString() <<
1800 " != " << spovnetId.toString() );
1801
1802 return false;
1803 }
1804
1805 // access granted? no -> fail
1806 if( !replyMsg.getJoinAllowed() ) {
1807 logging_error( "Our join request has been denied" );
1808
1809 // drop initiator link
1810 if( !bcLink.isUnspecified() ){
1811 bc->dropLink( bcLink );
1812
1813 vector<LinkID>::iterator it = std::find(
1814 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1815 if( it != bootstrapLinks.end() )
1816 bootstrapLinks.erase(it);
1817 }
1818
1819 // inform all registered services of the event
1820 foreach( NodeListener* i, nodeListeners )
1821 i->onJoinFailed( spovnetId );
1822
1823 return true;
1824 }
1825
1826 // access has been granted -> continue!
1827 logging_info("Join request has been accepted for spovnet " <<
1828 spovnetId.toString() );
1829
1830 logging_debug( "Using bootstrap end-point "
1831 << endpoints.toString() );
1832
1833 // create overlay structure from spovnet parameter set
1834 // if we have not boostrapped yet against some other node
1835 if( overlayInterface == NULL ){
1836
1837 logging_debug("first-time bootstrapping");
1838
1839 overlayInterface = OverlayFactory::create(
1840 *this, replyMsg.getParam(), nodeId, this );
1841
1842 // overlay structure supported? no-> fail!
1843 if( overlayInterface == NULL ) {
1844 logging_error( "overlay structure not supported" );
1845
1846 if( !bcLink.isUnspecified() ){
1847 bc->dropLink( bcLink );
1848
1849 vector<LinkID>::iterator it = std::find(
1850 bootstrapLinks.begin(), bootstrapLinks.end(), bcLink);
1851 if( it != bootstrapLinks.end() )
1852 bootstrapLinks.erase(it);
1853 }
1854
1855 // inform all registered services of the event
1856 foreach( NodeListener* i, nodeListeners )
1857 i->onJoinFailed( spovnetId );
1858
1859 return true;
1860 }
1861
1862 // everything ok-> join the overlay!
1863 state = BaseOverlayStateCompleted;
1864 overlayInterface->createOverlay();
1865
1866 overlayInterface->joinOverlay( endpoints );
1867 overlayBootstrap.recordJoin( endpoints );
1868
1869 // update ovlvis
1870 //ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
1871
1872 // inform all registered services of the event
1873 foreach( NodeListener* i, nodeListeners )
1874 i->onJoinCompleted( spovnetId );
1875 }
1876 else
1877 {
1878 // this is not the first bootstrap, just join the additional node
1879 logging_debug("not first-time bootstrapping");
1880 overlayInterface->joinOverlay( endpoints );
1881 overlayBootstrap.recordJoin( endpoints );
1882 } // if( overlayInterface == NULL )
1883
1884 return true;
1885}
1886
1887
1888bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld )
1889{
1890 // get service
1891 const ServiceID& service = ld->service; //overlayMsg->getService();
1892
1893 logging_debug( "Received data for service " << service.toString()
1894 << " on link " << overlayMsg->getDestinationLink().toString() );
1895
1896 // delegate data message
1897 CommunicationListener* lst = getListener(service);
1898 if(lst != NULL){
1899 lst->onMessage(
1900 message,
1901// overlayMsg->getSourceNode(),
1902// overlayMsg->getDestinationLink(),
1903 ld->remoteNode,
1904 ld->overlayId,
1905 overlayMsg->getSeqNum(),
1906 overlayMsg
1907 );
1908 }
1909
1910 return true;
1911}
1912
1913bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg )
1914{
1915 /**
1916 * Deserialize MessageLost-Message
1917 *
1918 * - Type of lost message
1919 * - Hop count of lost message
1920 * - Source-LinkID of lost message
1921 */
1922 const uint8_t* buff = message(0, sizeof(uint8_t)*2).data();
1923 uint8_t type = buff[0];
1924 uint8_t hops = buff[1];
1925 LinkID linkid;
1926 linkid.deserialize(message(sizeof(uint8_t)*2));
1927
1928 logging_warn("Node " << msg->getSourceNode()
1929 << " informed us, that our message of type " << (int) type
1930 << " is lost after traveling " << (int) hops << " hops."
1931 << " (LinkID: " << linkid.toString());
1932
1933
1934 // TODO switch-case ?
1935
1936 // BRANCH: LinkRequest --> link request failed
1937 if ( type == OverlayMsg::typeLinkRequest )
1938 {
1939 __onLinkEstablishmentFailed(linkid);
1940 }
1941
1942 // BRANCH: Data --> link disrupted. Drop link.
1943 // (We could use something more advanced here. e.g. At least send a
1944 // keep-alive message and wait for a keep-alive reply.)
1945 if ( type == OverlayMsg::typeData )
1946 {
1947 LinkDescriptor* link_desc = getDescriptor(linkid);
1948
1949 if ( link_desc )
1950 {
1951 link_desc->failed = true;
1952 }
1953
1954 dropLink(linkid);
1955 }
1956
1957 // BRANCH: ping lost
1958 if ( type == OverlayMsg::typePing )
1959 {
1960 CommunicationListener* lst = getListener(msg->getService());
1961 if( lst != NULL )
1962 {
1963 lst->onPingLost(msg->getSourceNode());
1964 }
1965 }
1966
1967 return true;
1968}
1969
1970bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld )
1971{
1972 // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node)
1973
1974 bool send_pong = false;
1975
1976 // inform application and ask permission to send a pong message
1977 CommunicationListener* lst = getListener(overlayMsg->getService());
1978 if( lst != NULL )
1979 {
1980 send_pong = lst->onPing(overlayMsg->getSourceNode());
1981 }
1982
1983 // send pong message if allowed
1984 if ( send_pong )
1985 {
1986 OverlayMsg pong_msg(OverlayMsg::typePong);
1987 pong_msg.setSeqNum(overlayMsg->getSeqNum());
1988
1989 // send message
1990 try
1991 {
1992 send_node( &pong_msg,
1993 overlayMsg->getSourceNode(),
1994 system_priority::OVERLAY,
1995 overlayMsg->getService() );
1996 }
1997 catch ( message_not_sent& e )
1998 {
1999 logging_info("Could not send Pong-Message to node: " <<
2000 overlayMsg->getSourceNode());
2001 }
2002 }
2003}
2004
2005bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2006{
2007 // inform application
2008 CommunicationListener* lst = getListener(overlayMsg->getService());
2009 if( lst != NULL )
2010 {
2011 lst->onPong(overlayMsg->getSourceNode());
2012 }
2013}
2014
2015bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2016
2017 if( ld == NULL ) {
2018 logging_warn( "received overlay update message for link for "
2019 << "which we have no mapping" );
2020 return false;
2021 }
2022 logging_info("Received type update message on link " << ld );
2023
2024 // update our link mapping information for this link
2025 bool changed =
2026 ( ld->remoteNode != overlayMsg->getSourceNode() )
2027 || ( ld->service != overlayMsg->getService() );
2028
2029 // set parameters
2030 ld->up = true;
2031 ld->remoteNode = overlayMsg->getSourceNode();
2032 ld->remoteLink = overlayMsg->getSourceLink();
2033 ld->service = overlayMsg->getService();
2034 ld->autolink = overlayMsg->isAutoLink();
2035
2036 // if our link information changed, we send out an update, too
2037 if( changed ) {
2038 overlayMsg->swapRoles();
2039 overlayMsg->setSourceNode(nodeId);
2040 overlayMsg->setSourceLink(ld->overlayId);
2041 overlayMsg->setService(ld->service);
2042 send( overlayMsg, ld, system_priority::OVERLAY );
2043 }
2044
2045 // service registered? no-> error!
2046 if( !communicationListeners.contains( ld->service ) ) {
2047 logging_warn( "Link up: event listener has not been registered" );
2048 return false;
2049 }
2050
2051 // default or no service registered?
2052 CommunicationListener* listener = communicationListeners.get( ld->service );
2053 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) {
2054 logging_warn("Link up: event listener is default or null!" );
2055 return true;
2056 }
2057
2058 // update descriptor
2059 ld->listener = listener;
2060 ld->setAutoUsed();
2061 ld->setAlive();
2062
2063 // ask the service whether it wants to accept this link
2064 if( !listener->onLinkRequest(ld->remoteNode) ) {
2065
2066 logging_debug("Link id=" << ld->overlayId.toString() <<
2067 " has been denied by service " << ld->service.toString() << ", dropping link");
2068
2069 // prevent onLinkDown calls to the service
2070 ld->listener = &CommunicationListener::DEFAULT;
2071
2072 // drop the link
2073 dropLink( ld->overlayId );
2074 return true;
2075 }
2076
2077 // set link up
2078 ld->up = true;
2079 logging_info( "Link has been accepted by service and is up: " << ld );
2080
2081 // auto links: link has been accepted -> send queued messages
2082 if( ld->messageQueue.size() > 0 ) {
2083 logging_info( "Sending out queued messages on link " << ld );
2084 foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue )
2085 {
2086 sendMessage( msg.message, ld->overlayId, msg.priority );
2087 }
2088 ld->messageQueue.clear();
2089 }
2090
2091 // call the notification functions
2092 listener->onLinkUp( ld->overlayId, ld->remoteNode );
2093 sideport->onLinkUp( ld->overlayId, nodeId, ld->remoteNode, this->spovnetId );
2094
2095 return true;
2096}
2097
2098/// handle a link request and reply
2099bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2100
2101 //TODO: Check if a request has already been sent using getSourceLink() ...
2102
2103 // create link descriptor
2104 LinkDescriptor* ldn = addDescriptor();
2105
2106 // flags
2107 ldn->up = true;
2108 ldn->fromRemote = true;
2109 ldn->relayed = true;
2110
2111 // parameters
2112 ldn->service = overlayMsg->getService();
2113 ldn->listener = getListener(ldn->service);
2114 ldn->remoteNode = overlayMsg->getSourceNode();
2115 ldn->remoteLink = overlayMsg->getSourceLink();
2116 ldn->hops = overlayMsg->getNumHops();
2117
2118 // initialize sequence numbers
2119 ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short();
2120 logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum);
2121
2122
2123 // update time-stamps
2124 ldn->setAlive();
2125 ldn->setAutoUsed();
2126
2127 logging_info( "Link request received from node id="
2128 << overlayMsg->getSourceNode()
2129 << " LINK: "
2130 << ldn);
2131
2132 // create reply message and send back!
2133 overlayMsg->swapRoles(); // swap source/destination
2134 overlayMsg->setType(OverlayMsg::typeLinkReply);
2135 overlayMsg->setSourceLink(ldn->overlayId);
2136 overlayMsg->setRelayed(true);
2137// overlayMsg->setRouteRecord(true);
2138 overlayMsg->setSeqNum(ld->last_sent_seqnum);
2139
2140 // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!!
2141 // append our endpoints (for creation of a direct link)
2142 overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize());
2143
2144 send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link
2145
2146 // inform listener
2147 if(ldn != NULL && ldn->listener != NULL)
2148 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
2149
2150 return true;
2151}
2152
2153bool BaseOverlay::handleLinkReply(
2154 OverlayMsg* overlayMsg,
2155 reboost::shared_buffer_t sub_message,
2156 LinkDescriptor* ld )
2157{
2158 // deserialize EndpointDescriptor
2159 EndpointDescriptor endpoints;
2160 endpoints.deserialize(sub_message);
2161
2162 // find link request
2163 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink());
2164
2165 // not found? yes-> drop with error!
2166 if (ldn == NULL) {
2167 logging_error( "No link request pending for "
2168 << overlayMsg->getDestinationLink().toString() );
2169 return false;
2170 }
2171 logging_debug("Handling link reply for " << ldn )
2172
2173 // check if already up
2174 if (ldn->up) {
2175 logging_warn( "Link already up: " << ldn );
2176 return true;
2177 }
2178
2179 // debug message
2180 logging_info( "Link request reply received. Establishing link"
2181 << " for service " << overlayMsg->getService().toString()
2182 << " with local id=" << overlayMsg->getDestinationLink()
2183 << " and remote link id=" << overlayMsg->getSourceLink()
2184 << " to " << endpoints.toString()
2185 << " hop count: " << overlayMsg->getRouteRecord().size()
2186 );
2187
2188 // set local link descriptor data
2189 ldn->up = true;
2190 ldn->relayed = true;
2191 ldn->service = overlayMsg->getService();
2192 ldn->listener = getListener(ldn->service);
2193 ldn->remoteLink = overlayMsg->getSourceLink();
2194 ldn->remoteNode = overlayMsg->getSourceNode();
2195
2196 // update timestamps
2197 ldn->setAlive();
2198 ldn->setAutoUsed();
2199
2200 // auto links: link has been accepted -> send queued messages
2201 if( ldn->messageQueue.size() > 0 ) {
2202 logging_info( "Sending out queued messages on link " <<
2203 ldn->overlayId.toString() );
2204 foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue )
2205 {
2206 sendMessage( msg.message, ldn->overlayId, msg.priority );
2207 }
2208 ldn->messageQueue.clear();
2209 }
2210
2211 // inform listeners about new link
2212 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode );
2213
2214 // try to replace relay link with direct link
2215 ldn->retryCounter = 3;
2216 ldn->endpoint = endpoints;
2217 ldn->communicationId = bc->establishLink( ldn->endpoint );
2218
2219 return true;
2220}
2221
2222/// handle a keep-alive message for a link
2223bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld )
2224{
2225 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink());
2226
2227 if ( rld != NULL )
2228 {
2229 logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() );
2230 if (overlayMsg->isRouteRecord())
2231 {
2232 rld->routeRecord = overlayMsg->getRouteRecord();
2233 }
2234
2235 // set alive
2236 rld->setAlive();
2237
2238
2239 /* answer keep alive */
2240 if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive )
2241 {
2242 time_t now = time(NULL);
2243 logging_debug("[BaseOverlay] Answering KeepAlive over "
2244 << ld->to_string()
2245 << " after "
2246 << difftime( now, ld->keepAliveSent )
2247 << "s");
2248
2249 OverlayMsg msg( OverlayMsg::typeKeepAliveReply,
2250 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );
2251 msg.setRouteRecord(true);
2252 ld->keepAliveSent = now;
2253 send_link( &msg, ld->overlayId, system_priority::OVERLAY );
2254 }
2255
2256 return true;
2257 }
2258 else
2259 {
2260 logging_error("No Keep-Alive for "
2261 << overlayMsg->getDestinationLink() << ": link unknown." );
2262 return false;
2263 }
2264}
2265
2266/// handle a direct link message
2267bool BaseOverlay::handleLinkDirect( OverlayMsg* overlayMsg, LinkDescriptor* ld ) {
2268 logging_debug( "Received direct link replacement request" );
2269
2270 /// get destination overlay link
2271 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() );
2272 if (rld == NULL || ld == NULL) {
2273 logging_error("Direct link replacement: Link "
2274 << overlayMsg->getDestinationLink() << "not found error." );
2275 return false;
2276 }
2277 logging_info( "Received direct link convert notification for " << rld );
2278
2279 // update information
2280 rld->communicationId = ld->communicationId;
2281 rld->communicationUp = true;
2282 rld->relayed = false;
2283
2284 // mark used and alive!
2285 rld->setAlive();
2286 rld->setAutoUsed();
2287
2288 // erase the original descriptor
2289 eraseDescriptor(ld->overlayId);
2290
2291 // inform listener
2292 if( rld->listener != NULL)
2293 rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode );
2294
2295 return true;
2296}
2297
2298/// handles an incoming message
2299bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld,
2300 const LinkID bcLink )
2301{
2302 // decapsulate overlay message
2303 OverlayMsg* overlayMsg = new OverlayMsg();
2304 reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message);
2305
2306// // XXX debug
2307// logging_info( "Received overlay message."
2308// << " Hops: " << (int) overlayMsg->getNumHops()
2309// << " Type: " << (int) overlayMsg->getType()
2310// << " Payload size: " << sub_buff.size()
2311// << " SeqNum: " << overlayMsg->getSeqNum() );
2312
2313
2314 // increase number of hops
2315 overlayMsg->increaseNumHops();
2316
2317 // refresh relay information
2318 refreshRelayInformation( overlayMsg, ld );
2319
2320 // update route record
2321 overlayMsg->addRouteRecord(nodeId);
2322
2323 // handle signaling messages (do not route!)
2324 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart &&
2325 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd )
2326 {
2327 overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED);
2328 delete overlayMsg;
2329 return true;
2330 }
2331
2332 // message for reached destination? no-> route message
2333 if (!overlayMsg->getDestinationNode().isUnspecified() &&
2334 overlayMsg->getDestinationNode() != nodeId ) {
2335 logging_debug("Routing message "
2336 << " from " << overlayMsg->getSourceNode()
2337 << " to " << overlayMsg->getDestinationNode()
2338 );
2339
2340// // XXX testing AKTUELL
2341// logging_info("MARIO: Routing message "
2342// << " from " << overlayMsg->getSourceNode()
2343// << " to " << overlayMsg->getDestinationNode() );
2344// logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size());
2345 overlayMsg->append_buffer(sub_buff);
2346
2347 route( overlayMsg, ld->remoteNode );
2348 delete overlayMsg;
2349 return true;
2350 }
2351
2352
2353 /* handle base overlay message */
2354 bool ret = false; // return value
2355 try
2356 {
2357 switch ( overlayMsg->getType() )
2358 {
2359 // data transport messages
2360 case OverlayMsg::typeData:
2361 {
2362 // NOTE: On relayed links, »ld« does not point to our link, but on the relay link.
2363 LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink());
2364
2365 if ( ! end_to_end_ld )
2366 {
2367 logging_warn("Error: Data-Message claims to belong to a link we don't know.");
2368
2369 ret = false;
2370 }
2371 else
2372 {
2373 // message received --> link is alive
2374 end_to_end_ld->keepAliveReceived = time(NULL);
2375 // hop count on this link
2376 end_to_end_ld->hops = overlayMsg->getNumHops();
2377
2378 // * call handler *
2379 ret = handleData(sub_buff, overlayMsg, end_to_end_ld);
2380 }
2381
2382 break;
2383 }
2384 case OverlayMsg::typeMessageLost:
2385 ret = handleLostMessage(sub_buff, overlayMsg);
2386
2387 break;
2388
2389 // overlay setup messages
2390 case OverlayMsg::typeJoinRequest:
2391 ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink ); break;
2392 case OverlayMsg::typeJoinReply:
2393 ret = handleJoinReply(sub_buff, bcLink ); break;
2394
2395 // link specific messages
2396 case OverlayMsg::typeLinkRequest:
2397 ret = handleLinkRequest(overlayMsg, ld ); break;
2398 case OverlayMsg::typeLinkReply:
2399 ret = handleLinkReply(overlayMsg, sub_buff, ld ); break;
2400 case OverlayMsg::typeLinkUpdate:
2401 ret = handleLinkUpdate(overlayMsg, ld ); break;
2402 case OverlayMsg::typeKeepAlive:
2403 case OverlayMsg::typeKeepAliveReply:
2404 ret = handleLinkAlive(overlayMsg, ld ); break;
2405 case OverlayMsg::typeLinkDirect:
2406 ret = handleLinkDirect(overlayMsg, ld ); break;
2407
2408 case OverlayMsg::typeLinkClose:
2409 {
2410 dropLink(overlayMsg->getDestinationLink());
2411 __removeDroppedLink(overlayMsg->getDestinationLink());
2412
2413 break;
2414 }
2415
2416 /// ping over overlay path (or similar)
2417 case OverlayMsg::typePing:
2418 {
2419 ret = handlePing(overlayMsg, ld);
2420 break;
2421 }
2422 case OverlayMsg::typePong:
2423 {
2424 ret = handlePong(overlayMsg, ld);
2425 break;
2426 }
2427
2428 // handle unknown message type
2429 default:
2430 {
2431 logging_error( "received message in invalid state! don't know " <<
2432 "what to do with this message of type " << overlayMsg->getType() );
2433 ret = false;
2434 break;
2435 }
2436 }
2437 }
2438 catch ( reboost::illegal_sub_buffer& e )
2439 {
2440 logging_error( "Failed to create sub-buffer while reading message: »"
2441 << e.what()
2442 << "« Message too short? ");
2443
2444 assert(false); // XXX
2445 }
2446
2447 // free overlay message and return value
2448 delete overlayMsg;
2449 return ret;
2450}
2451
2452// ----------------------------------------------------------------------------
2453
2454void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) {
2455
2456 logging_debug( "broadcasting message to all known nodes " <<
2457 "in the overlay from service " + service.toString() );
2458
2459 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true);
2460 for(size_t i=0; i<nodes.size(); i++){
2461 NodeID& id = nodes.at(i);
2462 if(id == this->nodeId) continue; // don't send to ourselfs
2463
2464 sendMessage( message, id, priority, service );
2465 }
2466}
2467
2468/// return the overlay neighbors
2469vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const {
2470 // the known nodes _can_ also include our node, so we remove ourself
2471 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep);
2472 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
2473 if( i != nodes.end() ) nodes.erase( i );
2474 return nodes;
2475}
2476
2477const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
2478 if( lid == LinkID::UNSPECIFIED ) return nodeId;
2479 const LinkDescriptor* ld = getDescriptor(lid);
2480 if( ld == NULL ) return NodeID::UNSPECIFIED;
2481 else return ld->remoteNode;
2482}
2483
2484vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
2485 vector<LinkID> linkvector;
2486 foreach( LinkDescriptor* ld, links ) {
2487 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) {
2488 linkvector.push_back( ld->overlayId );
2489 }
2490 }
2491 return linkvector;
2492}
2493
2494
2495void BaseOverlay::onNodeJoin(const NodeID& node) {
2496 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
2497 if( i == joiningNodes.end() ) return;
2498
2499 logging_info( "node has successfully joined baseoverlay and overlay structure "
2500 << node.toString() );
2501
2502 joiningNodes.erase( i );
2503}
2504
2505void BaseOverlay::eventFunction() {
2506 stabilizeRelays();
2507 stabilizeLinks();
2508 updateVisual();
2509}
2510
2511
2512
2513/* link status */
2514bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const
2515{
2516 const LinkDescriptor* ld = getDescriptor(lnk);
2517
2518 if (!ld)
2519 return false;
2520
2521 return ld->communicationUp && !ld->relayed;
2522}
2523
2524int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const
2525{
2526 const LinkDescriptor* ld = getDescriptor(lnk);
2527
2528 if (!ld)
2529 return -1;
2530
2531 return ld->hops;
2532}
2533
2534
2535bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const
2536{
2537 time_t now = time(NULL);
2538
2539 return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..?
2540}
2541
2542bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const
2543{
2544 return isLinkVital(link) && link->communicationUp && !link->relayed;
2545}
2546
2547/* [link status] */
2548
2549
2550void BaseOverlay::updateVisual(){
2551
2552 //
2553 // update base overlay structure
2554 //
2555
2556 static NodeID pre = NodeID::UNSPECIFIED;
2557 static NodeID suc = NodeID::UNSPECIFIED;
2558
2559 vector<NodeID> nodes = this->getOverlayNeighbors(false);
2560
2561 if(nodes.size() == 0){
2562
2563 if(pre != NodeID::UNSPECIFIED){
2564 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
2565 pre = NodeID::UNSPECIFIED;
2566 }
2567 if(suc != NodeID::UNSPECIFIED){
2568 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
2569 suc = NodeID::UNSPECIFIED;
2570 }
2571
2572 } // if(nodes.size() == 0)
2573
2574 if(nodes.size() == 1){
2575 // only one node, make this pre and succ
2576 // and then go into the node.size()==2 case
2577 //nodes.push_back(nodes.at(0));
2578
2579 if(pre != nodes.at(0)){
2580 pre = nodes.at(0);
2581 if(pre != NodeID::UNSPECIFIED)
2582 visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
2583 }
2584 }
2585
2586 if(nodes.size() == 2){
2587
2588 // old finger
2589 if(nodes.at(0) != pre){
2590 if(pre != NodeID::UNSPECIFIED)
2591 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, pre, "");
2592 pre = NodeID::UNSPECIFIED;
2593 }
2594 if(nodes.at(1) != suc){
2595 if(suc != NodeID::UNSPECIFIED)
2596 visualInstance.visDisconnect(visualIdOverlay, this->nodeId, suc, "");
2597 suc = NodeID::UNSPECIFIED;
2598 }
2599
2600 // connect with fingers
2601 if(pre == NodeID::UNSPECIFIED){
2602 pre = nodes.at(0);
2603 if(pre != NodeID::UNSPECIFIED)
2604 visualInstance.visConnect(visualIdOverlay, this->nodeId, pre, "");
2605 }
2606 if(suc == NodeID::UNSPECIFIED){
2607 suc = nodes.at(1);
2608 if(suc != NodeID::UNSPECIFIED)
2609 visualInstance.visConnect(visualIdOverlay, this->nodeId, suc, "");
2610 }
2611
2612 } //if(nodes.size() == 2)
2613
2614// {
2615// logging_error("================================");
2616// logging_error("my nodeid " << nodeId.get(MAX_KEYLENGTH-16, 16));
2617// logging_error("================================");
2618// if(nodes.size()>= 1){
2619// logging_error("real pre " << nodes.at(0).toString());
2620// logging_error("real pre " << nodes.at(0).get(MAX_KEYLENGTH-16, 16));
2621// }
2622// if(nodes.size()>= 2){
2623// logging_error("real suc " << nodes.at(1).toString());
2624// logging_error("real suc " << nodes.at(1).get(MAX_KEYLENGTH-16, 16));
2625// }
2626// logging_error("================================");
2627// if(pre == NodeID::UNSPECIFIED){
2628// logging_error("pre: unspecified");
2629// }else{
2630// unsigned int prei = pre.get(MAX_KEYLENGTH-16, 16);
2631// logging_error("pre: " << prei);
2632// }
2633// if(suc == NodeID::UNSPECIFIED){
2634// logging_error("suc: unspecified");
2635// }else{
2636// unsigned int suci = suc.get(MAX_KEYLENGTH-16, 16);
2637// logging_error("suc: " << suci);
2638// }
2639// logging_error("================================");
2640// }
2641
2642 //
2643 // update base communication links
2644 //
2645
2646 static set<NodeID> linkset;
2647 set<NodeID> remotenodes;
2648 foreach( LinkDescriptor* ld, links ) {
2649 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)
2650 continue;
2651
2652 if (ld->routeRecord.size()>1 && ld->relayed) {
2653 for (size_t i=1; i<ld->routeRecord.size(); i++)
2654 remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] );
2655 } else {
2656 remotenodes.insert(ld->remoteNode);
2657 }
2658 }
2659
2660 // which links are old and need deletion?
2661 bool changed = false;
2662
2663 do{
2664 changed = false;
2665 foreach(NodeID n, linkset){
2666 if(remotenodes.find(n) == remotenodes.end()){
2667 visualInstance.visDisconnect(visualIdBase, this->nodeId, n, "");
2668 linkset.erase(n);
2669 changed = true;
2670 break;
2671 }
2672 }
2673 }while(changed);
2674
2675 // which links are new and need creation?
2676 do{
2677 changed = false;
2678 foreach(NodeID n, remotenodes){
2679 if(linkset.find(n) == linkset.end()){
2680 visualInstance.visConnect(visualIdBase, this->nodeId, n, "");
2681 linkset.insert(n);
2682 changed = true;
2683 break;
2684 }
2685 }
2686 }while(changed);
2687
2688}
2689
2690// ----------------------------------------------------------------------------
2691
2692std::string BaseOverlay::debugInformation() {
2693 std::stringstream s;
2694 int i=0;
2695
2696 // dump overlay information
2697 s << "Long debug info ... [see below]" << endl << endl;
2698 s << "--- overlay information ----------------------" << endl;
2699 s << overlayInterface->debugInformation() << endl;
2700
2701 // dump link state
2702 s << "--- link state -------------------------------" << endl;
2703 foreach( LinkDescriptor* ld, links ) {
2704 s << "link " << i << ": " << ld << endl;
2705 i++;
2706 }
2707 s << endl << endl;
2708
2709 return s.str();
2710}
2711
2712}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.