source: source/ariba/overlay/BaseOverlay.cpp@ 4836

Last change on this file since 4836 was 4836, checked in by Christoph Mayer, 15 years ago

einige avahi fixes und ablauf

File size: 35.1 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include "ariba/utility/misc/OvlVis.h"
42#include "ariba/NodeListener.h"
43#include "ariba/CommunicationListener.h"
44#include "ariba/SideportListener.h"
45
46#include "ariba/overlay/messages/OverlayMsg.h"
47#include "ariba/overlay/messages/JoinRequest.h"
48#include "ariba/overlay/messages/JoinReply.h"
49#include "ariba/overlay/messages/LinkRequest.h"
50
51namespace ariba {
52namespace overlay {
53
54use_logging_cpp(BaseOverlay);
55
56BaseOverlay::BaseOverlay()
57 : bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
58 spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED),
59 state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT){
60}
61
62BaseOverlay::~BaseOverlay(){
63}
64
65void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ){
66
67 bc = &_basecomm;
68 nodeId = _nodeid;
69
70 logging_info("creating base overlay");
71
72 bc->registerMessageReceiver( this );
73 bc->registerEventListener( this );
74
75 ovl.visCreate( ovlId, nodeId, string(""), string("") );
76 ovl.visChangeNodeColor(ovlId, nodeId, OvlVis::NODE_COLORS_GREY);
77
78// if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
79// Identifier(Configuration::instance().read<unsigned long>("SOURCE"))) {
80// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CAMERA);
81// } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
82// Identifier(Configuration::instance().read<unsigned long>("MR_A"))) {
83// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_A);
84// } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
85// Identifier(Configuration::instance().read<unsigned long>("MR_W"))) {
86// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_W);
87// }
88
89 // timer for auto link management
90 Timer::setInterval( 5000 );
91 Timer::start();
92}
93
94void BaseOverlay::stop() {
95
96 logging_info("deleting base overlay");
97
98 Timer::stop();
99 bc->unregisterMessageReceiver( this );
100 bc->unregisterEventListener( this );
101
102 if(overlayInterface != NULL){
103 delete overlayInterface;
104 overlayInterface = NULL;
105 }
106}
107
108void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp){
109
110 ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
111 logging_info( "starting to join spovnet " << id.toString() <<
112 " with nodeid " << nodeId.toString());
113
114 spovnetId = id;
115 state = BaseOverlayStateJoinInitiated;
116
117
118 //
119 // start bootstrapping for spovnetid
120 //
121
122 overlayBootstrap.start( this, spovnetId, nodeId );
123 overlayBootstrap.publish( bc->getEndpointDescriptor() );
124
125 //
126 // contact the spovnet initiator and request
127 // to join. if the join is granted we will
128 // receive further information on the structure
129 // of the overlay that is used in the spovnet
130 //
131 // but first, we have to establish a link to the initiator...
132 //
133
134 initiatorLink = bc->establishLink( bootstrapEp );
135 logging_info("join process initiated for " << id.toString() << "...");
136}
137
138void BaseOverlay::leaveSpoVNet(){
139
140 logging_info( "leaving spovnet " << spovnetId );
141 bool ret = ( state != this->BaseOverlayStateInvalid );
142
143 logging_debug( "dropping all auto-links ..." );
144
145 // now we start leaving the spovnet: fist delete all links
146 // that we still have in the baseoverlay initiated by
147 // some services, the leave the actual overlay structure,
148 // then leave the spovnet
149
150 // --> drop all service links
151
152 vector<LinkID> servicelinks;
153 BOOST_FOREACH( LinkPair item, linkMapping ){
154 if( item.second.service != OverlayInterface::OVERLAY_SERVICE_ID )
155 servicelinks.push_back( item.first );
156 }
157 BOOST_FOREACH( LinkID lnk, servicelinks ){
158 // the dropLink function will remove
159 // the item from the linkMapping
160 dropLink( lnk );
161 }
162
163 // --> leave overlay structure
164
165 logging_debug( "leaving overlay" );
166 // first, leave the overlay interface
167 if( overlayInterface != NULL )
168 overlayInterface->leaveOverlay();
169
170 // --> leave spovnet
171
172 if( state != BaseOverlayStateInitiator ){
173
174 // then, leave the spovnet baseoverlay
175 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeBye, nodeId );
176 bc->sendMessage( initiatorLink, &overMsg );
177
178 // drop the link and set to correct state
179 bc->dropLink( initiatorLink );
180 initiatorLink = LinkID::UNSPECIFIED;
181 }
182
183 state = BaseOverlayStateInvalid;
184 ovl.visShutdown( ovlId, nodeId, string("") );
185
186 // inform all registered services of the event
187 BOOST_FOREACH( NodeListener* i, nodeListeners ){
188 if( ret ) i->onLeaveCompleted( spovnetId );
189 else i->onLeaveFailed( spovnetId );
190 }
191}
192
193void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos){
194
195 // set the state that we are an initiator, this way incoming messages are
196 // handled correctly
197 logging_info( "creating spovnet " + id.toString() <<
198 " with nodeid " << nodeId.toString() );
199
200 spovnetId = id;
201 state = BaseOverlayStateInitiator;
202
203 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
204 if( overlayInterface == NULL ){
205 logging_fatal( "overlay structure not supported" );
206 state = BaseOverlayStateInvalid;
207 return;
208 }
209
210 // bootstrap against ourselfs
211 overlayInterface->joinOverlay();
212 BOOST_FOREACH( NodeListener* i, nodeListeners )
213 i->onJoinCompleted( spovnetId );
214
215 ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
216 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
217}
218
219
220/// establishes a link between two arbitrary nodes
221const LinkID BaseOverlay::establishLink( const NodeID& node,
222 const ServiceID& service, const LinkID& link_id ) {
223
224 if( !communicationListeners.contains( service ) ){
225 logging_error( "no registered listener on serviceid " << service.toString() );
226 return LinkID::UNSPECIFIED;
227 }
228
229 // copy link id
230 LinkID linkid = link_id;
231
232 // create link id if necessary
233 if (linkid.isUnspecified()) linkid = LinkID::create();
234
235 // debug message
236 logging_debug( "BaseOverlay called to establish link between node " <<
237 node.toString() << " for service " << service.toString() );
238
239 // create link request message with own link id
240 OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId );
241 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
242 LinkRequest link_request_msg( nonce, &bc->getEndpointDescriptor() );
243 overlay_msg.encapsulate( &link_request_msg );
244 pendingLinks.insert( make_pair(nonce, linkid) );
245
246 // debug message
247 logging_debug( "BaseOverlay routes LinkRequest message to node " << node.toString() );
248
249 // route message to overlay node
250 overlayInterface->routeMessage( node, &overlay_msg );
251
252 CommunicationListener* receiver = communicationListeners.get( service );
253 assert( receiver != NULL );
254
255 LinkItem item (linkid, NodeID::UNSPECIFIED, service, receiver);
256 linkMapping.insert( make_pair(linkid, item) );
257
258 return linkid;
259}
260
261const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep,
262 const ServiceID& service, const LinkID& linkid ){
263
264 if( !communicationListeners.contains( service ) ){
265 logging_error( "no registered listener on serviceid " << service.toString() );
266 return LinkID::UNSPECIFIED;
267 }
268
269 const LinkID link = bc->establishLink( ep, linkid );
270
271 CommunicationListener* receiver = communicationListeners.get( service );
272 assert( receiver != NULL );
273
274 LinkItem item (link, NodeID::UNSPECIFIED, service, receiver);
275 linkMapping.insert( make_pair(link, item) );
276
277 return link;
278}
279
280void BaseOverlay::dropLink(const LinkID& link){
281
282 logging_debug( "baseoverlay dropping link " << link.toString() );
283 LinkMapping::iterator i = linkMapping.find( link );
284
285 // find the link item to drop
286 if( i == linkMapping.end() ){
287 logging_warn( "can't drop link, mapping unknown " << link.toString() );
288 return;
289 }
290
291 LinkItem item = i->second;
292
293 // delete all queued messages
294 if( item.waitingmsg.size() > 0 ){
295
296 logging_warn( "dropping link " << link.toString() <<
297 " that has " << item.waitingmsg.size() << " waiting messages" );
298
299 item.deleteWaiting();
300 }
301
302 // erase the mapping and drop the link
303 linkMapping.erase( i );
304 bc->dropLink( link );
305
306 // tell sideports and listeners of the drop
307 item.interface->onLinkDown( link, item.node );
308 sideport->onLinkDown(link, this->nodeId, item.node, this->spovnetId );
309}
310
311seqnum_t BaseOverlay::sendMessage(const Message* message, const LinkID& link ){
312
313 logging_debug( "baseoverlay is sending data message on link " << link.toString() );
314
315 //
316 // get the mapping for this link
317 //
318
319 LinkMapping::iterator i = linkMapping.find( link );
320 if( i == linkMapping.end() ){
321 logging_error( "could not send message. link not found " << link.toString() );
322 return -1;
323 }
324
325 i->second.markused();
326
327 //
328 // check if the link is up yet, if its an autlink queue message
329 //
330
331 if( !i->second.linkup ){
332
333 if( i->second.autolink ){
334 logging_info( "auto link " << link.toString() << " is not up yet, queueing message" );
335 Data data = data_serialize( message );
336 const_cast<Message*>(message)->dropPayload();
337 i->second.waitingmsg.push_back( new Message(data) );
338 } else {
339 logging_error("link " << link.toString() << " is not up yet, dropping message" );
340 }
341
342 return -1;
343 }
344
345 //
346 // send the message through the basecomm
347 //
348
349 OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId );
350 overmsg.encapsulate( const_cast<Message*>(message) );
351
352 return bc->sendMessage( link, &overmsg );
353}
354
355seqnum_t BaseOverlay::sendMessage(const Message* message, const NodeID& node, const ServiceID& service){
356
357 LinkID link = LinkID::UNSPECIFIED;
358
359 LinkMapping::iterator i = linkMapping.begin();
360 LinkMapping::iterator iend = linkMapping.end();
361
362 //
363 // see if we find a link for this node and service destination
364 //
365
366 for( ; i != iend; i++ ){
367 if( i->second.node == node && i->second.service == service ){
368 link = i->second.link;
369 break;
370 }
371 }
372
373 //
374 // if we found no link, create an auto link
375 //
376
377 if( link == LinkID::UNSPECIFIED ){
378
379 logging_info( "no link could be found to send message to node " <<
380 node.toString() << " for service " << service.toString() <<
381 ". creating auto link ...");
382
383 // call basecomm to create a link
384 link = establishLink( node, service );
385
386 // this will call onlinkup on us, if everything worked we now have a mapping
387 LinkMapping::iterator i = linkMapping.find( link );
388 i->second.autolink = true;
389
390 if( i == linkMapping.end() || link == LinkID::UNSPECIFIED ){
391 logging_error( "failed to establish auto link to node " << node.toString() <<
392 " for service " << service.toString() );
393 return -1;
394 }
395
396 logging_debug( "establishing autolink in progress to node "
397 << node.toString() << " with new link-id " << link.toString() );
398
399 } // if( link != LinkID::UNSPECIFIED )
400
401 assert( link != LinkID::UNSPECIFIED );
402
403 // mark the link as used, as we
404 // now send a message through it
405 i->second.markused();
406
407 // send the message through the new link. the link may not be functional,
408 // but for us there is a link-id so we can send messages through it. if
409 // the link is not yet up and the message needs to be cached, this is the
410 // task of the BaseCommunication, it will cache and send it later.
411 return sendMessage( message, link );
412}
413
414const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const LinkID& link) const {
415
416 return bc->getEndpointDescriptor( link );
417}
418
419const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const NodeID& node) const {
420
421 if( node == nodeId || node == NodeID::UNSPECIFIED )
422 return bc->getEndpointDescriptor();
423
424 if( overlayInterface == NULL ){
425 logging_error( "overlay interface not set, cannot resolve endpoint" );
426 return EndpointDescriptor::UNSPECIFIED;
427 }
428
429 // TODO: if this is not a onehop overlay the operation will go asynchronously
430 return overlayInterface->resolveNode( node );
431}
432
433
434bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid){
435 logging_debug( "binding communication listener " << listener
436 << " on serviceid " << sid.toString() );
437
438 if( communicationListeners.contains( sid ) ){
439 logging_error( "some listener already registered for service id "
440 << sid.toString() );
441 return false;
442 }
443
444 communicationListeners.registerItem( listener, sid );
445 return true;
446}
447
448bool BaseOverlay::registerSidePort(SideportListener* _sideport){
449 sideport = _sideport;
450 _sideport->configure( this );
451}
452
453bool BaseOverlay::unregisterSidePort(SideportListener* _sideport){
454 sideport = &SideportListener::DEFAULT;
455}
456
457bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid){
458 logging_debug( "unbinding listener " << listener
459 << " from serviceid " << sid.toString() );
460
461 if( !communicationListeners.contains( sid ) ){
462 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
463 return false;
464 }
465
466 if( communicationListeners.get(sid) != listener ){
467 logging_warn( "listener bound to service id " << sid.toString()
468 << " is different than listener trying to unbind" );
469 return false;
470 }
471
472 communicationListeners.unregisterItem( sid );
473 return true;
474}
475
476bool BaseOverlay::bind(NodeListener* listener){
477 logging_debug( "binding node listener " << listener );
478
479 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
480 if( i != nodeListeners.end() ){
481 logging_warn( "node listener " << listener << " is already bound, cannot bind" );
482 return false;
483 }
484
485 nodeListeners.push_back( listener );
486 return true;
487}
488
489bool BaseOverlay::unbind(NodeListener* listener){
490 logging_debug( "unbinding node listener " << listener );
491
492 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
493 if( i == nodeListeners.end() ){
494 logging_warn( "node listener " << listener << " is not bound, cannot unbind" );
495 return false;
496 }
497
498 nodeListeners.erase( i );
499 return true;
500}
501
502void BaseOverlay::onLinkUp(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
503
504 logging_debug( "base overlay received linkup event " + id.toString() );
505 // TODO: updateOvlVis( getNodeID(id) );
506
507 //
508 // if we get up a link while we are in the
509 // join phase and this is the link that
510 // we have initiated towards the spovnet owner
511 // continue the join process by sending
512 // a join request message through the link
513 //
514
515 if( state == BaseOverlayStateJoinInitiated && id == initiatorLink){
516
517 logging_info(
518 "Join has been initiated by me and the link is now up. " <<
519 "sending out join request for SpoVNet " << spovnetId.toString()
520 );
521
522 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeJoinRequest, nodeId );
523 JoinRequest joinmsg( spovnetId, nodeId );
524 overMsg.encapsulate( &joinmsg );
525
526 state = BaseOverlayStateJoinInitiated; // state remains in JoinInitiated
527 bc->sendMessage( id, &overMsg );
528
529 return;
530
531 } // if( state == BaseOverlayStateJoinInitiated && id == initiatorLink)
532
533 //
534 // otherwise this is a link initiated by a service
535 // then we exchange update messages to exchange the
536 // service id and node id for the link. in this case
537 // we should have a link mapping for this link. if
538 // we have no link mapping this link was initiated by
539 // the remote side.
540 //
541
542 LinkMapping::iterator i = linkMapping.find( id );
543
544 if( i == linkMapping.end() ){
545
546 LinkItem item (id, NodeID::UNSPECIFIED, ServiceID::UNSPECIFIED, &CommunicationListener::DEFAULT );
547 linkMapping.insert( make_pair(id, item) );
548
549 } else {
550
551 logging_debug( "sending out OverlayMessageTypeUpdate" <<
552 " for service " << i->second.service.toString() <<
553 " with local node id " << nodeId.toString() <<
554 " on link " << id.toString() );
555
556 OverlayMsg overMsg(
557 OverlayMsg::OverlayMessageTypeUpdate,
558 i->second.service,
559 nodeId
560 );
561
562 bc->sendMessage( id, &overMsg );
563 i->second.markused();
564
565 } // if( i == linkMapping.end() )
566
567 // the link is only valid for the service when we receive
568 // the OverlayMessageTypeUpdate from the remote node and
569 // have the nodeid and serviceid for the link!
570}
571
572void BaseOverlay::onLinkDown(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
573
574 logging_debug( "link went down " << id.toString() );
575
576 //
577 // tell the service that the link went
578 // down and remove the mapping
579 //
580
581 LinkMapping::iterator i = linkMapping.find( id );
582 if( i == linkMapping.end() ) {
583 // this can also be one of the baseoverlay links that
584 // no mapping is stored for. therefore we issue no warning.
585 // it can also be a link that has been dropped and the
586 // mapping is already deleted in the dropLink function.
587 // also, the service notification is issued then in dropLink
588 return;
589 }
590
591 i->second.interface->onLinkDown( id, i->second.node );
592 sideport->onLinkDown( id, this->nodeId, i->second.node, this->spovnetId );
593
594 // delete all queued messages
595 if( i->second.waitingmsg.size() > 0 ){
596
597 logging_warn( "dropping link " << id.toString() <<
598 " that has " << i->second.waitingmsg.size() << " waiting messages" );
599
600 i->second.deleteWaiting();
601 }
602
603 linkMapping.erase( i );
604}
605
606void BaseOverlay::onLinkChanged(const LinkID& id, const NetworkLocator* oldlocal, const NetworkLocator* newlocal, const NetworkLocator* oldremote, const NetworkLocator* newremote){
607
608 logging_debug( "link changed " << id.toString() );
609
610 //
611 // tell the service that the link changed
612 //
613
614 LinkMapping::iterator i = linkMapping.find( id );
615 if( i == linkMapping.end() ) return;
616
617 i->second.interface->onLinkChanged( id, i->second.node );
618 sideport->onLinkChanged( id, this->nodeId, i->second.node, this->spovnetId );
619
620 // TODO call onLinkQoSChanged?
621
622 i->second.markused();
623}
624
625void BaseOverlay::onLinkFail(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
626
627 logging_debug( "link failed " << id.toString() );
628
629 //
630 // tell the service that the link failed
631 //
632
633 LinkMapping::iterator i = linkMapping.find( id );
634 if( i == linkMapping.end() ) return;
635
636 i->second.interface->onLinkFail( id, i->second.node );
637 sideport->onLinkFail( id, this->nodeId, i->second.node, this->spovnetId );
638
639 i->second.markused();
640}
641
642void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote, const QoSParameterSet& qos) {
643
644 logging_debug( "link qos changed " << id.toString() );
645
646 //
647 // tell the service that the link qos has changed
648 //
649
650 LinkMapping::iterator i = linkMapping.find( id );
651 if( i == linkMapping.end() ) return;
652
653 // TODO: convert QoSParameterSet to the LinkProperties properties
654 // TODO: currently not in the interface: i->second.interface->onLinkQoSChanged( id, i->second.node, LinkProperties::DEFAULT );
655
656 i->second.markused();
657}
658
659bool BaseOverlay::onLinkRequest( const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote ){
660
661 // also see in the receiveMessage function. there the higher layer service
662 // is asked whether to accept link requests, but there a basic link association is
663 // already built up, so we know the node id
664 logging_debug("received link request from " << remote->toString() << ", accepting");
665 return true;
666}
667
668
669bool BaseOverlay::receiveMessage(const Message* message,
670 const LinkID& link, const NodeID&
671 /*the nodeid is invalid in this case! removed var to prevent errors*/ ){
672
673 // decapsulate overlay message
674 logging_debug( "receiveMessage: " << message->toString());
675 OverlayMsg* overlayMsg = const_cast<Message*>(message)->decapsulate<OverlayMsg>();
676 if( overlayMsg == NULL ) return false;
677
678 // mark the link as in action
679 LinkMapping::iterator item = linkMapping.find( link );
680 if( item != linkMapping.end() ) item->second.markused();
681
682 /* ************************************************************************
683 /* handle user date that we forward to the appropriate service using the
684 * service id in the message. as we don't know the class of message that
685 * the service handles, we forward it as a pure Message
686 */
687 if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) {
688
689 logging_debug( "baseoverlay received message of type OverlayMessageTypeData" );
690
691 const ServiceID& service = overlayMsg->getService();
692 CommunicationListener* serviceListener = communicationListeners.get( service );
693
694 logging_debug( "received data for service " << service.toString() );
695
696 if( serviceListener != NULL )
697 serviceListener->onMessage( overlayMsg, overlayMsg->getSourceNode(), link );
698
699 return true;
700
701 } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) )
702
703 /* ************************************************************************
704 /* Handle spovnet instance join requests
705 */
706 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) ){
707
708 logging_debug(
709 "baseoverlay received message of type OverlayMessageTypeJoinRequest"
710 );
711
712 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
713 logging_info( "received join request for spovnet " <<
714 joinReq->getSpoVNetID().toString() );
715
716 /* make sure that the node actually wants to join
717 * the correct spovnet id that we administrate */
718 if( joinReq->getSpoVNetID() != spovnetId ){
719 logging_error( "received join request for spovnet we don't handle " <<
720 joinReq->getSpoVNetID().toString() );
721 return false;
722 }
723
724 //
725 // only if all services allow the node to join it is allowed
726 // using the isJoinAllowed interface security policies can be
727 // implemented by higher layer services
728 //
729
730 // TODO: here you can implement mechanisms to deny joining of a node
731 bool allow = true;
732
733 logging_info( "sending back join reply for spovnet " <<
734 spovnetId.toString() << " to node " <<
735 overlayMsg->getSourceNode().toString() <<
736 ". result: " << (allow ? "allowed" : "denied") );
737
738 joiningNodes.push_back( overlayMsg->getSourceNode() );
739
740 //
741 // send back our spovnetid, default overlay parameters, join allow
742 // result, and ourself as the end-point to bootstrap the overlay against
743 //
744
745 assert( overlayInterface != NULL );
746 OverlayParameterSet parameters = overlayInterface->getParameters();
747
748 OverlayMsg retmsg( OverlayMsg::OverlayMessageTypeJoinReply, nodeId );
749 JoinReply replyMsg( spovnetId, parameters,
750 allow, getEndpointDescriptor() );
751
752 retmsg.encapsulate(&replyMsg);
753 bc->sendMessage( link, &retmsg );
754
755 return true;
756
757 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest))
758
759 /* ************************************************************************
760 * handle replies to spovnet instance join requests
761 */
762 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) &&
763 state == BaseOverlayStateJoinInitiated){
764
765 logging_debug(
766 "baseoverlay received message of type OverlayMessageTypeJoinReply");
767
768 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
769 logging_info( "received spovnet join reply" );
770
771 // ensure that we actually wanted to get into the spovnet whose id is
772 // in the message
773 if( replyMsg->getSpoVNetID() != spovnetId ){
774 logging_error( "received spovnet join reply for spovnet " <<
775 replyMsg->getSpoVNetID().toString() <<
776 " but we wanted to join spovnet " <<
777 spovnetId.toString() );
778
779 // state does not change here, maybe the reply does come in later
780 return false;
781 }
782
783 // if we did not get access to the spovnet notify of the failure and
784 // close the link to the initiator
785 if( ! replyMsg->getJoinAllowed() ){
786
787 logging_error( "our join request has been denied" );
788
789 bc->dropLink( initiatorLink );
790 initiatorLink = LinkID::UNSPECIFIED;
791 state = BaseOverlayStateInvalid;
792
793 // inform all registered services of the event
794 BOOST_FOREACH( NodeListener* i, nodeListeners ){
795 i->onJoinFailed( spovnetId );
796 }
797
798 return true;
799 }
800
801 logging_info( "join request has been accepted for spovnet " <<
802 spovnetId.toString() );
803
804 // if we did get access to the spovnet we try to create the overlay
805 // structure as given in the reply message
806 overlayInterface = OverlayFactory::create( *this,
807 replyMsg->getParam(), nodeId, this );
808
809 if( overlayInterface == NULL ){
810 logging_error( "overlay structure not supported" );
811
812 bc->dropLink( initiatorLink );
813 initiatorLink = LinkID::UNSPECIFIED;
814 state = BaseOverlayStateInvalid;
815
816 // inform all registered services of the event
817 BOOST_FOREACH( NodeListener* i, nodeListeners )
818 i->onJoinFailed( spovnetId );
819
820 return true;
821 }
822
823 /* now start the join process for the overlay. the join process for the
824 * spovnet baseoverlay is now complete. we use the endpoint for overlay
825 * structure bootstrapping that the initiator provided in his reply
826 * message */
827 state = BaseOverlayStateCompleted;
828 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
829
830 overlayInterface->createOverlay();
831 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
832
833 // inform all registered services of the event
834 BOOST_FOREACH( NodeListener* i, nodeListeners ){
835 i->onJoinCompleted( spovnetId );
836 }
837
838 return true;
839
840 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) && state == BaseOverlayStateJoinInitiated)
841
842
843 /* ************************************************************************
844 * handle update messages for link establishment
845 */
846 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ){
847
848 logging_debug(
849 "baseoverlay received message of type OverlayMessageTypeUpdate"
850 );
851
852 const NodeID& sourcenode = overlayMsg->getSourceNode();
853 const ServiceID& service = overlayMsg->getService();
854
855 // linkmapping for the link available? no-> ignore
856 LinkMapping::iterator i = linkMapping.find( link );
857 if( i == linkMapping.end() ) {
858 logging_warn( "received overlay update message for link " <<
859 link.toString() << " for which we have no mapping" );
860 return false;
861 }
862
863 // update our link mapping information for this link
864 bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service );
865 i->second.node = sourcenode;
866 i->second.service = service;
867
868 // if our link information changed, we send out an update, too
869 if( changed ){
870 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId );
871 bc->sendMessage( link, &overMsg );
872 }
873
874 // set the correct listener service for the linkitem
875 // now we can tell the registered service of the linkup event
876 if( !communicationListeners.contains( service ) ){
877 logging_warn( "linkup event for service that has not been registered" );
878 return false;
879 }
880
881 CommunicationListener* iface = communicationListeners.get( service );
882 if( iface == NULL || iface == &CommunicationListener::DEFAULT ){
883 logging_warn( "linkup event for service that has been registered "
884 "with a NULL interface" );
885 return true;
886 }
887
888 i->second.interface = iface;
889 i->second.markused();
890
891 // ask the service whether it wants to accept this link
892 if( !iface->onLinkRequest(sourcenode) ){
893
894 logging_debug("link " << link.toString() <<
895 " has been denied by service " << service.toString() << ", dropping link");
896
897 // prevent onLinkDown calls to the service
898 i->second.interface = &CommunicationListener::DEFAULT;
899 // drop the link
900 dropLink( link );
901
902 return true;
903 }
904
905 //
906 // link has been accepted, link is now up, send messages out first
907 //
908
909 i->second.linkup = true;
910 logging_debug("link " << link.toString() <<
911 " has been accepted by service " << service.toString() << " and is now up");
912
913 if( i->second.waitingmsg.size() > 0 ){
914 logging_info( "sending out queued messages on link " << link.toString() );
915
916 BOOST_FOREACH( Message* msg, i->second.waitingmsg ){
917 sendMessage( msg, link );
918 delete msg;
919 }
920
921 i->second.waitingmsg.clear();
922 }
923
924 // call the notification functions
925 iface->onLinkUp( link, sourcenode );
926 sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId );
927
928 return true;
929
930 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) )
931
932 /* ************************************************************************
933 * handle bye messages
934 */
935 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye) ) {
936
937 logging_debug( "BaseOverlay received message of type OverlayMessageTypeBye" );
938 logging_debug( "Received bye message from " <<
939 overlayMsg->getSourceNode().toString() );
940
941 /* if we are the initiator and receive a bye from a node
942 * the node just left. if we are a node and receive a bye
943 * from the initiator, we have to close, too.
944 */
945 if( overlayMsg->getSourceNode() == spovnetInitiator ){
946
947 bc->dropLink( initiatorLink );
948 initiatorLink = LinkID::UNSPECIFIED;
949 state = BaseOverlayStateInvalid;
950
951 logging_fatal( "initiator ended spovnet" );
952
953 // inform all registered services of the event
954 BOOST_FOREACH( NodeListener* i, nodeListeners ){
955 i->onLeaveFailed( spovnetId );
956 }
957
958 } else {
959 // a node that said goodbye and we are the initiator don't have to
960 // do much here, as the node also will go out of the overlay
961 // structure
962 logging_info( "node left " << overlayMsg->getSourceNode() );
963 }
964
965 return true;
966
967 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye))
968
969 /* ************************************************************************
970 * handle link request forwarded through the overlay
971 */
972 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest)) {
973 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
974 const ServiceID& service = overlayMsg->getService();
975 if (linkReq->isReply()) {
976
977 // find link
978 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
979 if ( i == pendingLinks.end() ) {
980 logging_error( "Nonce not found in link request" );
981 return true;
982 }
983
984 // debug message
985 logging_debug( "LinkRequest reply received. Establishing link "
986 << i->second << " to " << (linkReq->getEndpoint()->toString())
987 << " for service " << service.toString()
988 << " with nonce " << linkReq->getNonce()
989 );
990
991 // establishing link
992 bc->establishLink( *linkReq->getEndpoint(), i->second );
993 } else {
994 OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId );
995 LinkRequest link_request_msg(
996 linkReq->getNonce(), &bc->getEndpointDescriptor(), true );
997 overlay_msg.encapsulate( &link_request_msg );
998
999 // debug message
1000 logging_debug( "Sending LinkRequest reply for link with nonce " <<
1001 linkReq->getNonce() );
1002
1003 // route message back over overlay
1004 overlayInterface->routeMessage(
1005 overlayMsg->getSourceNode(), &overlay_msg
1006 );
1007 }
1008 } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest))
1009
1010 /* ************************************************************************
1011 * unknown message type ... error!
1012 */
1013 else {
1014
1015 logging_error( "received message in invalid state! don't know " <<
1016 "what to do with this message of type " <<
1017 overlayMsg->getType() );
1018 return false;
1019
1020 } // else
1021
1022 return false;
1023}
1024
1025void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service){
1026
1027 logging_debug( "broadcasting message to all known nodes " <<
1028 "in the overlay from service " + service.toString() );
1029
1030 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes();
1031
1032 OverlayInterface::NodeList::iterator i = nodes.begin();
1033 OverlayInterface::NodeList::iterator iend = nodes.end();
1034
1035 for( ; i != iend; i++ ){
1036 if( *i == nodeId) continue; // don't send to ourselfs
1037 sendMessage( message, *i, service );
1038 }
1039}
1040
1041vector<NodeID> BaseOverlay::getOverlayNeighbors() const {
1042 // the known nodes _can_ also include our
1043 // node, so we remove ourselfs
1044
1045 vector<NodeID> nodes = overlayInterface->getKnownNodes();
1046 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1047 if( i != nodes.end() ) nodes.erase( i );
1048
1049 return nodes;
1050}
1051
1052void BaseOverlay::updateOvlVis( const NodeID& n ) {
1053 NodeID node = n;
1054/* void visShowNodeBubble (
1055 NETWORK_ID network,
1056 NodeID& node,
1057 string label
1058 );
1059*/
1060 using namespace std;
1061
1062 if (node == nodeId || node.isUnspecified()) return;
1063
1064 // min/max
1065 if ( node < min || min.isUnspecified() ) min = node;
1066 if ( node > max || max.isUnspecified() ) max = node;
1067
1068 // successor
1069 if ( succ.isUnspecified() || (node > nodeId && (succ < nodeId || (node-nodeId) < (succ-nodeId))) ) {
1070 if (!succ.isUnspecified() && node != succ)
1071 ovl.visDisconnect(ovlId, nodeId, succ, string(""));
1072 succ = node;
1073 ovl.visConnect(ovlId, nodeId, succ, string(""));
1074 }
1075
1076 // set successor (circle-wrap)
1077 if (succ.isUnspecified() && !min.isUnspecified()) {
1078 succ = min;
1079 ovl.visConnect(ovlId, nodeId, succ, string(""));
1080 }
1081}
1082
1083const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1084
1085 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1086
1087 LinkMapping::const_iterator i = linkMapping.find( lid );
1088 if( i == linkMapping.end() ) return NodeID::UNSPECIFIED;
1089 else return i->second.node;
1090}
1091
1092vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1093
1094 vector<LinkID> linkvector;
1095
1096 BOOST_FOREACH( LinkPair item, linkMapping ){
1097 if( item.second.node == nid || nid == NodeID::UNSPECIFIED ){
1098 linkvector.push_back( item.second.link );
1099 }
1100 }
1101
1102 return linkvector;
1103}
1104
1105void BaseOverlay::incomingRouteMessage(Message* msg){
1106 // gets handled as normal data message
1107 receiveMessage( msg, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED );
1108}
1109
1110void BaseOverlay::onNodeJoin(const NodeID& node){
1111
1112 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1113 if( i == joiningNodes.end() ) return;
1114
1115 logging_info( "node has successfully joined baseoverlay and overlay structure "
1116 << node.toString() );
1117
1118 joiningNodes.erase( i );
1119}
1120
1121void BaseOverlay::eventFunction(){
1122
1123 list<LinkID> oldlinks;
1124 time_t now = time(NULL);
1125
1126 // first gather all the links from linkMapping that need droppin
1127 // don't directly drop, as the dropLink function affects the
1128 // linkMapping structure that we are traversing here.
1129 // drop links after a timeout of 30s
1130
1131 BOOST_FOREACH( LinkPair item, linkMapping ){
1132 if( item.second.autolink && difftime(now, item.second.lastuse) > 30)
1133 oldlinks.push_back( item.first );
1134 }
1135
1136 BOOST_FOREACH( const LinkID lnk, oldlinks ) {
1137 logging_debug( "auto-link " << lnk.toString() << " timed out and is getting dropped" );
1138 dropLink( lnk );
1139 }
1140}
1141
1142}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.