source: source/ariba/overlay/BaseOverlay.cpp@ 4838

Last change on this file since 4838 was 4838, checked in by Christoph Mayer, 15 years ago

bootstrap ablauf fixes und avahi fixes

File size: 35.1 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "BaseOverlay.h"
40
41#include "ariba/utility/misc/OvlVis.h"
42#include "ariba/NodeListener.h"
43#include "ariba/CommunicationListener.h"
44#include "ariba/SideportListener.h"
45
46#include "ariba/overlay/messages/OverlayMsg.h"
47#include "ariba/overlay/messages/JoinRequest.h"
48#include "ariba/overlay/messages/JoinReply.h"
49#include "ariba/overlay/messages/LinkRequest.h"
50
51namespace ariba {
52namespace overlay {
53
54use_logging_cpp(BaseOverlay);
55
56BaseOverlay::BaseOverlay()
57 : bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),
58 spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED),
59 state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT){
60}
61
62BaseOverlay::~BaseOverlay(){
63}
64
65void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ){
66
67 bc = &_basecomm;
68 nodeId = _nodeid;
69
70 logging_info("creating base overlay");
71
72 bc->registerMessageReceiver( this );
73 bc->registerEventListener( this );
74
75 ovl.visCreate( ovlId, nodeId, string(""), string("") );
76 ovl.visChangeNodeColor(ovlId, nodeId, OvlVis::NODE_COLORS_GREY);
77
78// if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
79// Identifier(Configuration::instance().read<unsigned long>("SOURCE"))) {
80// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CAMERA);
81// } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
82// Identifier(Configuration::instance().read<unsigned long>("MR_A"))) {
83// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_A);
84// } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
85// Identifier(Configuration::instance().read<unsigned long>("MR_W"))) {
86// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_W);
87// }
88
89 // timer for auto link management
90 Timer::setInterval( 5000 );
91 Timer::start();
92}
93
94void BaseOverlay::stop() {
95
96 logging_info("deleting base overlay");
97
98 Timer::stop();
99 bc->unregisterMessageReceiver( this );
100 bc->unregisterEventListener( this );
101
102 if(overlayInterface != NULL){
103 delete overlayInterface;
104 overlayInterface = NULL;
105 }
106}
107
108void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp){
109
110 ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
111 logging_info( "starting to join spovnet " << id.toString() <<
112 " with nodeid " << nodeId.toString());
113
114 spovnetId = id;
115 state = BaseOverlayStateJoinInitiated;
116
117 //
118 // start bootstrapping for spovnetid
119 //
120
121 overlayBootstrap.start( this, spovnetId, nodeId );
122 overlayBootstrap.publish( bc->getEndpointDescriptor() );
123
124 //
125 // contact the spovnet initiator and request
126 // to join. if the join is granted we will
127 // receive further information on the structure
128 // of the overlay that is used in the spovnet
129 //
130 // but first, we have to establish a link to the initiator...
131 //
132
133 initiatorLink = bc->establishLink( bootstrapEp );
134 logging_info("join process initiated for " << id.toString() << "...");
135}
136
137void BaseOverlay::leaveSpoVNet(){
138
139 logging_info( "leaving spovnet " << spovnetId );
140 bool ret = ( state != this->BaseOverlayStateInvalid );
141
142 logging_debug( "dropping all auto-links ..." );
143
144 // now we start leaving the spovnet: fist delete all links
145 // that we still have in the baseoverlay initiated by
146 // some services, the leave the actual overlay structure,
147 // then leave the spovnet
148
149 // --> drop all service links
150
151 vector<LinkID> servicelinks;
152 BOOST_FOREACH( LinkPair item, linkMapping ){
153 if( item.second.service != OverlayInterface::OVERLAY_SERVICE_ID )
154 servicelinks.push_back( item.first );
155 }
156 BOOST_FOREACH( LinkID lnk, servicelinks ){
157 // the dropLink function will remove
158 // the item from the linkMapping
159 dropLink( lnk );
160 }
161
162 // --> leave overlay structure
163
164 logging_debug( "leaving overlay" );
165 // first, leave the overlay interface
166 if( overlayInterface != NULL )
167 overlayInterface->leaveOverlay();
168
169 // --> leave spovnet
170
171 if( state != BaseOverlayStateInitiator ){
172
173 // then, leave the spovnet baseoverlay
174 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeBye, nodeId );
175 bc->sendMessage( initiatorLink, &overMsg );
176
177 // drop the link and set to correct state
178 bc->dropLink( initiatorLink );
179 initiatorLink = LinkID::UNSPECIFIED;
180 }
181
182 state = BaseOverlayStateInvalid;
183 ovl.visShutdown( ovlId, nodeId, string("") );
184
185 // inform all registered services of the event
186 BOOST_FOREACH( NodeListener* i, nodeListeners ){
187 if( ret ) i->onLeaveCompleted( spovnetId );
188 else i->onLeaveFailed( spovnetId );
189 }
190}
191
192void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos){
193
194 // set the state that we are an initiator, this way incoming messages are
195 // handled correctly
196 logging_info( "creating spovnet " + id.toString() <<
197 " with nodeid " << nodeId.toString() );
198
199 spovnetId = id;
200 state = BaseOverlayStateInitiator;
201
202 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
203 if( overlayInterface == NULL ){
204 logging_fatal( "overlay structure not supported" );
205 state = BaseOverlayStateInvalid;
206 return;
207 }
208
209 // bootstrap against ourselfs
210 overlayInterface->joinOverlay();
211 BOOST_FOREACH( NodeListener* i, nodeListeners )
212 i->onJoinCompleted( spovnetId );
213
214 ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
215 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
216}
217
218
219/// establishes a link between two arbitrary nodes
220const LinkID BaseOverlay::establishLink( const NodeID& node,
221 const ServiceID& service, const LinkID& link_id ) {
222
223 if( !communicationListeners.contains( service ) ){
224 logging_error( "no registered listener on serviceid " << service.toString() );
225 return LinkID::UNSPECIFIED;
226 }
227
228 // copy link id
229 LinkID linkid = link_id;
230
231 // create link id if necessary
232 if (linkid.isUnspecified()) linkid = LinkID::create();
233
234 // debug message
235 logging_debug( "BaseOverlay called to establish link between node " <<
236 node.toString() << " for service " << service.toString() );
237
238 // create link request message with own link id
239 OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId );
240 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL));
241 LinkRequest link_request_msg( nonce, &bc->getEndpointDescriptor() );
242 overlay_msg.encapsulate( &link_request_msg );
243 pendingLinks.insert( make_pair(nonce, linkid) );
244
245 // debug message
246 logging_debug( "BaseOverlay routes LinkRequest message to node " << node.toString() );
247
248 // route message to overlay node
249 overlayInterface->routeMessage( node, &overlay_msg );
250
251 CommunicationListener* receiver = communicationListeners.get( service );
252 assert( receiver != NULL );
253
254 LinkItem item (linkid, NodeID::UNSPECIFIED, service, receiver);
255 linkMapping.insert( make_pair(linkid, item) );
256
257 return linkid;
258}
259
260const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep,
261 const ServiceID& service, const LinkID& linkid ){
262
263 if( !communicationListeners.contains( service ) ){
264 logging_error( "no registered listener on serviceid " << service.toString() );
265 return LinkID::UNSPECIFIED;
266 }
267
268 const LinkID link = bc->establishLink( ep, linkid );
269
270 CommunicationListener* receiver = communicationListeners.get( service );
271 assert( receiver != NULL );
272
273 LinkItem item (link, NodeID::UNSPECIFIED, service, receiver);
274 linkMapping.insert( make_pair(link, item) );
275
276 return link;
277}
278
279void BaseOverlay::dropLink(const LinkID& link){
280
281 logging_debug( "baseoverlay dropping link " << link.toString() );
282 LinkMapping::iterator i = linkMapping.find( link );
283
284 // find the link item to drop
285 if( i == linkMapping.end() ){
286 logging_warn( "can't drop link, mapping unknown " << link.toString() );
287 return;
288 }
289
290 LinkItem item = i->second;
291
292 // delete all queued messages
293 if( item.waitingmsg.size() > 0 ){
294
295 logging_warn( "dropping link " << link.toString() <<
296 " that has " << item.waitingmsg.size() << " waiting messages" );
297
298 item.deleteWaiting();
299 }
300
301 // erase the mapping and drop the link
302 linkMapping.erase( i );
303 bc->dropLink( link );
304
305 // tell sideports and listeners of the drop
306 item.interface->onLinkDown( link, item.node );
307 sideport->onLinkDown(link, this->nodeId, item.node, this->spovnetId );
308}
309
310seqnum_t BaseOverlay::sendMessage(const Message* message, const LinkID& link ){
311
312 logging_debug( "baseoverlay is sending data message on link " << link.toString() );
313
314 //
315 // get the mapping for this link
316 //
317
318 LinkMapping::iterator i = linkMapping.find( link );
319 if( i == linkMapping.end() ){
320 logging_error( "could not send message. link not found " << link.toString() );
321 return -1;
322 }
323
324 i->second.markused();
325
326 //
327 // check if the link is up yet, if its an autlink queue message
328 //
329
330 if( !i->second.linkup ){
331
332 if( i->second.autolink ){
333 logging_info( "auto link " << link.toString() << " is not up yet, queueing message" );
334 Data data = data_serialize( message );
335 const_cast<Message*>(message)->dropPayload();
336 i->second.waitingmsg.push_back( new Message(data) );
337 } else {
338 logging_error("link " << link.toString() << " is not up yet, dropping message" );
339 }
340
341 return -1;
342 }
343
344 //
345 // send the message through the basecomm
346 //
347
348 OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId );
349 overmsg.encapsulate( const_cast<Message*>(message) );
350
351 return bc->sendMessage( link, &overmsg );
352}
353
354seqnum_t BaseOverlay::sendMessage(const Message* message, const NodeID& node, const ServiceID& service){
355
356 LinkID link = LinkID::UNSPECIFIED;
357
358 LinkMapping::iterator i = linkMapping.begin();
359 LinkMapping::iterator iend = linkMapping.end();
360
361 //
362 // see if we find a link for this node and service destination
363 //
364
365 for( ; i != iend; i++ ){
366 if( i->second.node == node && i->second.service == service ){
367 link = i->second.link;
368 break;
369 }
370 }
371
372 //
373 // if we found no link, create an auto link
374 //
375
376 if( link == LinkID::UNSPECIFIED ){
377
378 logging_info( "no link could be found to send message to node " <<
379 node.toString() << " for service " << service.toString() <<
380 ". creating auto link ...");
381
382 // call basecomm to create a link
383 link = establishLink( node, service );
384
385 // this will call onlinkup on us, if everything worked we now have a mapping
386 LinkMapping::iterator i = linkMapping.find( link );
387 i->second.autolink = true;
388
389 if( i == linkMapping.end() || link == LinkID::UNSPECIFIED ){
390 logging_error( "failed to establish auto link to node " << node.toString() <<
391 " for service " << service.toString() );
392 return -1;
393 }
394
395 logging_debug( "establishing autolink in progress to node "
396 << node.toString() << " with new link-id " << link.toString() );
397
398 } // if( link != LinkID::UNSPECIFIED )
399
400 assert( link != LinkID::UNSPECIFIED );
401
402 // mark the link as used, as we
403 // now send a message through it
404 i->second.markused();
405
406 // send the message through the new link. the link may not be functional,
407 // but for us there is a link-id so we can send messages through it. if
408 // the link is not yet up and the message needs to be cached, this is the
409 // task of the BaseCommunication, it will cache and send it later.
410 return sendMessage( message, link );
411}
412
413const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const LinkID& link) const {
414
415 return bc->getEndpointDescriptor( link );
416}
417
418const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const NodeID& node) const {
419
420 if( node == nodeId || node == NodeID::UNSPECIFIED )
421 return bc->getEndpointDescriptor();
422
423 if( overlayInterface == NULL ){
424 logging_error( "overlay interface not set, cannot resolve endpoint" );
425 return EndpointDescriptor::UNSPECIFIED;
426 }
427
428 // TODO: if this is not a onehop overlay the operation will go asynchronously
429 return overlayInterface->resolveNode( node );
430}
431
432
433bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid){
434 logging_debug( "binding communication listener " << listener
435 << " on serviceid " << sid.toString() );
436
437 if( communicationListeners.contains( sid ) ){
438 logging_error( "some listener already registered for service id "
439 << sid.toString() );
440 return false;
441 }
442
443 communicationListeners.registerItem( listener, sid );
444 return true;
445}
446
447bool BaseOverlay::registerSidePort(SideportListener* _sideport){
448 sideport = _sideport;
449 _sideport->configure( this );
450}
451
452bool BaseOverlay::unregisterSidePort(SideportListener* _sideport){
453 sideport = &SideportListener::DEFAULT;
454}
455
456bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid){
457 logging_debug( "unbinding listener " << listener
458 << " from serviceid " << sid.toString() );
459
460 if( !communicationListeners.contains( sid ) ){
461 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() );
462 return false;
463 }
464
465 if( communicationListeners.get(sid) != listener ){
466 logging_warn( "listener bound to service id " << sid.toString()
467 << " is different than listener trying to unbind" );
468 return false;
469 }
470
471 communicationListeners.unregisterItem( sid );
472 return true;
473}
474
475bool BaseOverlay::bind(NodeListener* listener){
476 logging_debug( "binding node listener " << listener );
477
478 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
479 if( i != nodeListeners.end() ){
480 logging_warn( "node listener " << listener << " is already bound, cannot bind" );
481 return false;
482 }
483
484 nodeListeners.push_back( listener );
485 return true;
486}
487
488bool BaseOverlay::unbind(NodeListener* listener){
489 logging_debug( "unbinding node listener " << listener );
490
491 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener );
492 if( i == nodeListeners.end() ){
493 logging_warn( "node listener " << listener << " is not bound, cannot unbind" );
494 return false;
495 }
496
497 nodeListeners.erase( i );
498 return true;
499}
500
501void BaseOverlay::onLinkUp(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
502
503 logging_debug( "base overlay received linkup event " + id.toString() );
504 // TODO: updateOvlVis( getNodeID(id) );
505
506 //
507 // if we get up a link while we are in the
508 // join phase and this is the link that
509 // we have initiated towards the spovnet owner
510 // continue the join process by sending
511 // a join request message through the link
512 //
513
514 if( state == BaseOverlayStateJoinInitiated && id == initiatorLink){
515
516 logging_info(
517 "Join has been initiated by me and the link is now up. " <<
518 "sending out join request for SpoVNet " << spovnetId.toString()
519 );
520
521 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeJoinRequest, nodeId );
522 JoinRequest joinmsg( spovnetId, nodeId );
523 overMsg.encapsulate( &joinmsg );
524
525 state = BaseOverlayStateJoinInitiated; // state remains in JoinInitiated
526 bc->sendMessage( id, &overMsg );
527
528 return;
529
530 } // if( state == BaseOverlayStateJoinInitiated && id == initiatorLink)
531
532 //
533 // otherwise this is a link initiated by a service
534 // then we exchange update messages to exchange the
535 // service id and node id for the link. in this case
536 // we should have a link mapping for this link. if
537 // we have no link mapping this link was initiated by
538 // the remote side.
539 //
540
541 LinkMapping::iterator i = linkMapping.find( id );
542
543 if( i == linkMapping.end() ){
544
545 LinkItem item (id, NodeID::UNSPECIFIED, ServiceID::UNSPECIFIED, &CommunicationListener::DEFAULT );
546 linkMapping.insert( make_pair(id, item) );
547
548 } else {
549
550 logging_debug( "sending out OverlayMessageTypeUpdate" <<
551 " for service " << i->second.service.toString() <<
552 " with local node id " << nodeId.toString() <<
553 " on link " << id.toString() );
554
555 OverlayMsg overMsg(
556 OverlayMsg::OverlayMessageTypeUpdate,
557 i->second.service,
558 nodeId
559 );
560
561 bc->sendMessage( id, &overMsg );
562 i->second.markused();
563
564 } // if( i == linkMapping.end() )
565
566 // the link is only valid for the service when we receive
567 // the OverlayMessageTypeUpdate from the remote node and
568 // have the nodeid and serviceid for the link!
569}
570
571void BaseOverlay::onLinkDown(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
572
573 logging_debug( "link went down " << id.toString() );
574
575 //
576 // tell the service that the link went
577 // down and remove the mapping
578 //
579
580 LinkMapping::iterator i = linkMapping.find( id );
581 if( i == linkMapping.end() ) {
582 // this can also be one of the baseoverlay links that
583 // no mapping is stored for. therefore we issue no warning.
584 // it can also be a link that has been dropped and the
585 // mapping is already deleted in the dropLink function.
586 // also, the service notification is issued then in dropLink
587 return;
588 }
589
590 i->second.interface->onLinkDown( id, i->second.node );
591 sideport->onLinkDown( id, this->nodeId, i->second.node, this->spovnetId );
592
593 // delete all queued messages
594 if( i->second.waitingmsg.size() > 0 ){
595
596 logging_warn( "dropping link " << id.toString() <<
597 " that has " << i->second.waitingmsg.size() << " waiting messages" );
598
599 i->second.deleteWaiting();
600 }
601
602 linkMapping.erase( i );
603}
604
605void BaseOverlay::onLinkChanged(const LinkID& id, const NetworkLocator* oldlocal, const NetworkLocator* newlocal, const NetworkLocator* oldremote, const NetworkLocator* newremote){
606
607 logging_debug( "link changed " << id.toString() );
608
609 //
610 // tell the service that the link changed
611 //
612
613 LinkMapping::iterator i = linkMapping.find( id );
614 if( i == linkMapping.end() ) return;
615
616 i->second.interface->onLinkChanged( id, i->second.node );
617 sideport->onLinkChanged( id, this->nodeId, i->second.node, this->spovnetId );
618
619 // TODO call onLinkQoSChanged?
620
621 i->second.markused();
622}
623
624void BaseOverlay::onLinkFail(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
625
626 logging_debug( "link failed " << id.toString() );
627
628 //
629 // tell the service that the link failed
630 //
631
632 LinkMapping::iterator i = linkMapping.find( id );
633 if( i == linkMapping.end() ) return;
634
635 i->second.interface->onLinkFail( id, i->second.node );
636 sideport->onLinkFail( id, this->nodeId, i->second.node, this->spovnetId );
637
638 i->second.markused();
639}
640
641void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote, const QoSParameterSet& qos) {
642
643 logging_debug( "link qos changed " << id.toString() );
644
645 //
646 // tell the service that the link qos has changed
647 //
648
649 LinkMapping::iterator i = linkMapping.find( id );
650 if( i == linkMapping.end() ) return;
651
652 // TODO: convert QoSParameterSet to the LinkProperties properties
653 // TODO: currently not in the interface: i->second.interface->onLinkQoSChanged( id, i->second.node, LinkProperties::DEFAULT );
654
655 i->second.markused();
656}
657
658bool BaseOverlay::onLinkRequest( const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote ){
659
660 // also see in the receiveMessage function. there the higher layer service
661 // is asked whether to accept link requests, but there a basic link association is
662 // already built up, so we know the node id
663 logging_debug("received link request from " << remote->toString() << ", accepting");
664 return true;
665}
666
667
668bool BaseOverlay::receiveMessage(const Message* message,
669 const LinkID& link, const NodeID&
670 /*the nodeid is invalid in this case! removed var to prevent errors*/ ){
671
672 // decapsulate overlay message
673 logging_debug( "receiveMessage: " << message->toString());
674 OverlayMsg* overlayMsg = const_cast<Message*>(message)->decapsulate<OverlayMsg>();
675 if( overlayMsg == NULL ) return false;
676
677 // mark the link as in action
678 LinkMapping::iterator item = linkMapping.find( link );
679 if( item != linkMapping.end() ) item->second.markused();
680
681 /* ************************************************************************
682 /* handle user date that we forward to the appropriate service using the
683 * service id in the message. as we don't know the class of message that
684 * the service handles, we forward it as a pure Message
685 */
686 if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) {
687
688 logging_debug( "baseoverlay received message of type OverlayMessageTypeData" );
689
690 const ServiceID& service = overlayMsg->getService();
691 CommunicationListener* serviceListener = communicationListeners.get( service );
692
693 logging_debug( "received data for service " << service.toString() );
694
695 if( serviceListener != NULL )
696 serviceListener->onMessage( overlayMsg, overlayMsg->getSourceNode(), link );
697
698 return true;
699
700 } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) )
701
702 /* ************************************************************************
703 /* Handle spovnet instance join requests
704 */
705 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) ){
706
707 logging_debug(
708 "baseoverlay received message of type OverlayMessageTypeJoinRequest"
709 );
710
711 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
712 logging_info( "received join request for spovnet " <<
713 joinReq->getSpoVNetID().toString() );
714
715 /* make sure that the node actually wants to join
716 * the correct spovnet id that we administrate */
717 if( joinReq->getSpoVNetID() != spovnetId ){
718 logging_error( "received join request for spovnet we don't handle " <<
719 joinReq->getSpoVNetID().toString() );
720 return false;
721 }
722
723 //
724 // only if all services allow the node to join it is allowed
725 // using the isJoinAllowed interface security policies can be
726 // implemented by higher layer services
727 //
728
729 // TODO: here you can implement mechanisms to deny joining of a node
730 bool allow = true;
731
732 logging_info( "sending back join reply for spovnet " <<
733 spovnetId.toString() << " to node " <<
734 overlayMsg->getSourceNode().toString() <<
735 ". result: " << (allow ? "allowed" : "denied") );
736
737 joiningNodes.push_back( overlayMsg->getSourceNode() );
738
739 //
740 // send back our spovnetid, default overlay parameters, join allow
741 // result, and ourself as the end-point to bootstrap the overlay against
742 //
743
744 assert( overlayInterface != NULL );
745 OverlayParameterSet parameters = overlayInterface->getParameters();
746
747 OverlayMsg retmsg( OverlayMsg::OverlayMessageTypeJoinReply, nodeId );
748 JoinReply replyMsg( spovnetId, parameters,
749 allow, getEndpointDescriptor() );
750
751 retmsg.encapsulate(&replyMsg);
752 bc->sendMessage( link, &retmsg );
753
754 return true;
755
756 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest))
757
758 /* ************************************************************************
759 * handle replies to spovnet instance join requests
760 */
761 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) &&
762 state == BaseOverlayStateJoinInitiated){
763
764 logging_debug(
765 "baseoverlay received message of type OverlayMessageTypeJoinReply");
766
767 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
768 logging_info( "received spovnet join reply" );
769
770 // ensure that we actually wanted to get into the spovnet whose id is
771 // in the message
772 if( replyMsg->getSpoVNetID() != spovnetId ){
773 logging_error( "received spovnet join reply for spovnet " <<
774 replyMsg->getSpoVNetID().toString() <<
775 " but we wanted to join spovnet " <<
776 spovnetId.toString() );
777
778 // state does not change here, maybe the reply does come in later
779 return false;
780 }
781
782 // if we did not get access to the spovnet notify of the failure and
783 // close the link to the initiator
784 if( ! replyMsg->getJoinAllowed() ){
785
786 logging_error( "our join request has been denied" );
787
788 bc->dropLink( initiatorLink );
789 initiatorLink = LinkID::UNSPECIFIED;
790 state = BaseOverlayStateInvalid;
791
792 // inform all registered services of the event
793 BOOST_FOREACH( NodeListener* i, nodeListeners ){
794 i->onJoinFailed( spovnetId );
795 }
796
797 return true;
798 }
799
800 logging_info( "join request has been accepted for spovnet " <<
801 spovnetId.toString() );
802
803 // if we did get access to the spovnet we try to create the overlay
804 // structure as given in the reply message
805 overlayInterface = OverlayFactory::create( *this,
806 replyMsg->getParam(), nodeId, this );
807
808 if( overlayInterface == NULL ){
809 logging_error( "overlay structure not supported" );
810
811 bc->dropLink( initiatorLink );
812 initiatorLink = LinkID::UNSPECIFIED;
813 state = BaseOverlayStateInvalid;
814
815 // inform all registered services of the event
816 BOOST_FOREACH( NodeListener* i, nodeListeners )
817 i->onJoinFailed( spovnetId );
818
819 return true;
820 }
821
822 /* now start the join process for the overlay. the join process for the
823 * spovnet baseoverlay is now complete. we use the endpoint for overlay
824 * structure bootstrapping that the initiator provided in his reply
825 * message */
826 state = BaseOverlayStateCompleted;
827 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
828
829 overlayInterface->createOverlay();
830 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
831
832 // inform all registered services of the event
833 BOOST_FOREACH( NodeListener* i, nodeListeners ){
834 i->onJoinCompleted( spovnetId );
835 }
836
837 return true;
838
839 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) && state == BaseOverlayStateJoinInitiated)
840
841
842 /* ************************************************************************
843 * handle update messages for link establishment
844 */
845 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ){
846
847 logging_debug(
848 "baseoverlay received message of type OverlayMessageTypeUpdate"
849 );
850
851 const NodeID& sourcenode = overlayMsg->getSourceNode();
852 const ServiceID& service = overlayMsg->getService();
853
854 // linkmapping for the link available? no-> ignore
855 LinkMapping::iterator i = linkMapping.find( link );
856 if( i == linkMapping.end() ) {
857 logging_warn( "received overlay update message for link " <<
858 link.toString() << " for which we have no mapping" );
859 return false;
860 }
861
862 // update our link mapping information for this link
863 bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service );
864 i->second.node = sourcenode;
865 i->second.service = service;
866
867 // if our link information changed, we send out an update, too
868 if( changed ){
869 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId );
870 bc->sendMessage( link, &overMsg );
871 }
872
873 // set the correct listener service for the linkitem
874 // now we can tell the registered service of the linkup event
875 if( !communicationListeners.contains( service ) ){
876 logging_warn( "linkup event for service that has not been registered" );
877 return false;
878 }
879
880 CommunicationListener* iface = communicationListeners.get( service );
881 if( iface == NULL || iface == &CommunicationListener::DEFAULT ){
882 logging_warn( "linkup event for service that has been registered "
883 "with a NULL interface" );
884 return true;
885 }
886
887 i->second.interface = iface;
888 i->second.markused();
889
890 // ask the service whether it wants to accept this link
891 if( !iface->onLinkRequest(sourcenode) ){
892
893 logging_debug("link " << link.toString() <<
894 " has been denied by service " << service.toString() << ", dropping link");
895
896 // prevent onLinkDown calls to the service
897 i->second.interface = &CommunicationListener::DEFAULT;
898 // drop the link
899 dropLink( link );
900
901 return true;
902 }
903
904 //
905 // link has been accepted, link is now up, send messages out first
906 //
907
908 i->second.linkup = true;
909 logging_debug("link " << link.toString() <<
910 " has been accepted by service " << service.toString() << " and is now up");
911
912 if( i->second.waitingmsg.size() > 0 ){
913 logging_info( "sending out queued messages on link " << link.toString() );
914
915 BOOST_FOREACH( Message* msg, i->second.waitingmsg ){
916 sendMessage( msg, link );
917 delete msg;
918 }
919
920 i->second.waitingmsg.clear();
921 }
922
923 // call the notification functions
924 iface->onLinkUp( link, sourcenode );
925 sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId );
926
927 return true;
928
929 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) )
930
931 /* ************************************************************************
932 * handle bye messages
933 */
934 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye) ) {
935
936 logging_debug( "BaseOverlay received message of type OverlayMessageTypeBye" );
937 logging_debug( "Received bye message from " <<
938 overlayMsg->getSourceNode().toString() );
939
940 /* if we are the initiator and receive a bye from a node
941 * the node just left. if we are a node and receive a bye
942 * from the initiator, we have to close, too.
943 */
944 if( overlayMsg->getSourceNode() == spovnetInitiator ){
945
946 bc->dropLink( initiatorLink );
947 initiatorLink = LinkID::UNSPECIFIED;
948 state = BaseOverlayStateInvalid;
949
950 logging_fatal( "initiator ended spovnet" );
951
952 // inform all registered services of the event
953 BOOST_FOREACH( NodeListener* i, nodeListeners ){
954 i->onLeaveFailed( spovnetId );
955 }
956
957 } else {
958 // a node that said goodbye and we are the initiator don't have to
959 // do much here, as the node also will go out of the overlay
960 // structure
961 logging_info( "node left " << overlayMsg->getSourceNode() );
962 }
963
964 return true;
965
966 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye))
967
968 /* ************************************************************************
969 * handle link request forwarded through the overlay
970 */
971 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest)) {
972 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>();
973 const ServiceID& service = overlayMsg->getService();
974 if (linkReq->isReply()) {
975
976 // find link
977 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() );
978 if ( i == pendingLinks.end() ) {
979 logging_error( "Nonce not found in link request" );
980 return true;
981 }
982
983 // debug message
984 logging_debug( "LinkRequest reply received. Establishing link "
985 << i->second << " to " << (linkReq->getEndpoint()->toString())
986 << " for service " << service.toString()
987 << " with nonce " << linkReq->getNonce()
988 );
989
990 // establishing link
991 bc->establishLink( *linkReq->getEndpoint(), i->second );
992 } else {
993 OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId );
994 LinkRequest link_request_msg(
995 linkReq->getNonce(), &bc->getEndpointDescriptor(), true );
996 overlay_msg.encapsulate( &link_request_msg );
997
998 // debug message
999 logging_debug( "Sending LinkRequest reply for link with nonce " <<
1000 linkReq->getNonce() );
1001
1002 // route message back over overlay
1003 overlayInterface->routeMessage(
1004 overlayMsg->getSourceNode(), &overlay_msg
1005 );
1006 }
1007 } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest))
1008
1009 /* ************************************************************************
1010 * unknown message type ... error!
1011 */
1012 else {
1013
1014 logging_error( "received message in invalid state! don't know " <<
1015 "what to do with this message of type " <<
1016 overlayMsg->getType() );
1017 return false;
1018
1019 } // else
1020
1021 return false;
1022}
1023
1024void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service){
1025
1026 logging_debug( "broadcasting message to all known nodes " <<
1027 "in the overlay from service " + service.toString() );
1028
1029 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes();
1030
1031 OverlayInterface::NodeList::iterator i = nodes.begin();
1032 OverlayInterface::NodeList::iterator iend = nodes.end();
1033
1034 for( ; i != iend; i++ ){
1035 if( *i == nodeId) continue; // don't send to ourselfs
1036 sendMessage( message, *i, service );
1037 }
1038}
1039
1040vector<NodeID> BaseOverlay::getOverlayNeighbors() const {
1041 // the known nodes _can_ also include our
1042 // node, so we remove ourselfs
1043
1044 vector<NodeID> nodes = overlayInterface->getKnownNodes();
1045 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
1046 if( i != nodes.end() ) nodes.erase( i );
1047
1048 return nodes;
1049}
1050
1051void BaseOverlay::updateOvlVis( const NodeID& n ) {
1052 NodeID node = n;
1053/* void visShowNodeBubble (
1054 NETWORK_ID network,
1055 NodeID& node,
1056 string label
1057 );
1058*/
1059 using namespace std;
1060
1061 if (node == nodeId || node.isUnspecified()) return;
1062
1063 // min/max
1064 if ( node < min || min.isUnspecified() ) min = node;
1065 if ( node > max || max.isUnspecified() ) max = node;
1066
1067 // successor
1068 if ( succ.isUnspecified() || (node > nodeId && (succ < nodeId || (node-nodeId) < (succ-nodeId))) ) {
1069 if (!succ.isUnspecified() && node != succ)
1070 ovl.visDisconnect(ovlId, nodeId, succ, string(""));
1071 succ = node;
1072 ovl.visConnect(ovlId, nodeId, succ, string(""));
1073 }
1074
1075 // set successor (circle-wrap)
1076 if (succ.isUnspecified() && !min.isUnspecified()) {
1077 succ = min;
1078 ovl.visConnect(ovlId, nodeId, succ, string(""));
1079 }
1080}
1081
1082const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
1083
1084 if( lid == LinkID::UNSPECIFIED ) return nodeId;
1085
1086 LinkMapping::const_iterator i = linkMapping.find( lid );
1087 if( i == linkMapping.end() ) return NodeID::UNSPECIFIED;
1088 else return i->second.node;
1089}
1090
1091vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const {
1092
1093 vector<LinkID> linkvector;
1094
1095 BOOST_FOREACH( LinkPair item, linkMapping ){
1096 if( item.second.node == nid || nid == NodeID::UNSPECIFIED ){
1097 linkvector.push_back( item.second.link );
1098 }
1099 }
1100
1101 return linkvector;
1102}
1103
1104void BaseOverlay::incomingRouteMessage(Message* msg){
1105 // gets handled as normal data message
1106 receiveMessage( msg, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED );
1107}
1108
1109void BaseOverlay::onNodeJoin(const NodeID& node){
1110
1111 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
1112 if( i == joiningNodes.end() ) return;
1113
1114 logging_info( "node has successfully joined baseoverlay and overlay structure "
1115 << node.toString() );
1116
1117 joiningNodes.erase( i );
1118}
1119
1120void BaseOverlay::eventFunction(){
1121
1122 list<LinkID> oldlinks;
1123 time_t now = time(NULL);
1124
1125 // first gather all the links from linkMapping that need droppin
1126 // don't directly drop, as the dropLink function affects the
1127 // linkMapping structure that we are traversing here.
1128 // drop links after a timeout of 30s
1129
1130 BOOST_FOREACH( LinkPair item, linkMapping ){
1131 if( item.second.autolink && difftime(now, item.second.lastuse) > 30)
1132 oldlinks.push_back( item.first );
1133 }
1134
1135 BOOST_FOREACH( const LinkID lnk, oldlinks ) {
1136 logging_debug( "auto-link " << lnk.toString() << " timed out and is getting dropped" );
1137 dropLink( lnk );
1138 }
1139}
1140
1141}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.