close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/overlay/BaseOverlay.cpp@ 3038

Last change on this file since 3038 was 3037, checked in by Christoph Mayer, 16 years ago

-jede Menge fixes und Umstellungen
-angefangen ariba/interface los zu werden, erste dateien sind weg

File size: 26.3 KB
RevLine 
1// [Licence]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [Licence]
38
39#include "BaseOverlay.h"
40
41#include "ariba/utility/misc/OvlVis.h"
42using ariba::utility::OvlVis;
43
44namespace ariba {
45namespace overlay {
46
47use_logging_cpp(BaseOverlay);
48
49BaseOverlay::BaseOverlay( BaseCommunication& _basecomm, const NodeID& _nodeid )
50 : bc( _basecomm ), nodeId( _nodeid ), spovnetId( SpoVNetID::UNSPECIFIED), overlayInterface( NULL ),
51 initiatorLink( LinkID::UNSPECIFIED ), state( BaseOverlayStateInvalid ) {
52
53 logging_info("creating base overlay");
54
55 bc.registerMessageReceiver( this );
56 bc.registerEventListener( this );
57
58 ovl.visCreate( ovlId, nodeId, string(""), string("") );
59 ovl.visChangeNodeColor(ovlId, nodeId, OvlVis::NODE_COLORS_GREY);
60
61// if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
62// Identifier(Configuration::instance().read<unsigned long>("SOURCE"))) {
63// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CAMERA);
64// } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
65// Identifier(Configuration::instance().read<unsigned long>("MR_A"))) {
66// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_A);
67// } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
68// Identifier(Configuration::instance().read<unsigned long>("MR_W"))) {
69// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_W);
70// }
71
72 // timer for auto link management
73 Timer::setInterval( 5000 );
74 Timer::start();
75}
76
77BaseOverlay::~BaseOverlay() {
78
79 logging_info("deleting base overlay");
80
81 Timer::stop();
82 bc.unregisterMessageReceiver( this );
83 bc.unregisterEventListener( this );
84}
85
86void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp){
87
88 ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
89 logging_info( "starting to join spovnet " << id.toString() << "...");
90
91 //
92 // contact the spovnet initiator and request
93 // to join. if the join is granted we will
94 // receive further information on the structure
95 // of the overlay that is used in the spovnet
96 //
97 // but first, we have to establish a link to the initiator...
98 //
99
100 spovnetId = id;
101 state = BaseOverlayStateJoinInitiated;
102
103 initiatorLink = bc.establishLink( bootstrapEp );
104}
105
106void BaseOverlay::leaveSpoVNet(){
107
108 logging_info( "leaving spovnet " << spovnetId );
109 bool ret = ( state != this->BaseOverlayStateInvalid );
110
111 logging_debug( "dropping all auto-links ..." );
112
113 BOOST_FOREACH( LinkPair item, linkMapping ){
114 if( item.second.autolink )
115 dropLink( item.first );
116 }
117
118 logging_debug( "leaving overlay" );
119 // first, leave the overlay interface
120 overlayInterface->leaveOverlay();
121
122 if( state != BaseOverlayStateInitiator ){
123
124 // then, leave the spovnet baseoverlay
125 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeBye, nodeId );
126 bc.sendMessage( initiatorLink, &overMsg );
127
128 // drop the link and set to correct state
129 bc.dropLink( initiatorLink );
130 initiatorLink = LinkID::UNSPECIFIED;
131 }
132
133 state = BaseOverlayStateInvalid;
134 ovl.visShutdown( ovlId, nodeId, string("") );
135
136 // inform all registered services of the event
137 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
138 if( ret ) i->onLeaveSuccess( spovnetId );
139 else i->onLeaveFail( spovnetId );
140 }
141}
142
143void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos){
144
145 //
146 // set the state that we are an initiator,
147 // this way incoming messages are handled correctly
148 //
149
150 logging_info("creating spovnet " + id.toString());
151
152 spovnetId = id;
153 state = BaseOverlayStateInitiator;
154
155 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
156 if( overlayInterface == NULL ){
157 logging_fatal( "overlay structure not supported" );
158 state = BaseOverlayStateInvalid;
159 return;
160 }
161
162 //
163 // bootstrap against ourselfs
164 //
165
166 overlayInterface->joinOverlay();
167 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
168 i->onJoinSuccess( spovnetId );
169 }
170
171 ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
172 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
173}
174
175const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const LinkID& link) const {
176
177 return bc.getEndpointDescriptor( link );
178}
179
180const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const NodeID& node) const {
181
182 if( node == nodeId || node == NodeID::UNSPECIFIED )
183 return bc.getEndpointDescriptor();
184
185 if( overlayInterface == NULL ){
186 logging_error( "overlay interface not set, cannot resolve endpoint" );
187 return EndpointDescriptor::UNSPECIFIED;
188 }
189
190 // TODO: if this is not a onehop overlay the operation will go asynchronously
191 return overlayInterface->resolveNode( node );
192}
193
194const LinkID BaseOverlay::establishLink(const NodeID& node, const ServiceID& service){
195
196 // TODO: if this is not a onehop overlay the operation will go asynchronously
197 const EndpointDescriptor& endpoint = overlayInterface->resolveNode( node );
198 if( endpoint == EndpointDescriptor::UNSPECIFIED ){
199 logging_error( "could not resolve node to endpoint. unable to establish link" );
200 return LinkID::UNSPECIFIED;
201 }
202
203 logging_debug( "baseoverlay called to establish link between node " <<
204 node.toString() << " on endpoint " << endpoint.toString() <<
205 " for service " << service.toString() );
206
207 return establishLink( endpoint, service );
208}
209
210const LinkID BaseOverlay::establishLink(const EndpointDescriptor& ep, const ServiceID& service){
211
212 if( !listenerMux.contains( service ) ){
213 logging_error( "no registered listener on serviceid " << service.toString() );
214 return LinkID::UNSPECIFIED;
215 }
216
217 ServiceInterface* receiver = listenerMux.get( service );
218 const LinkID link = bc.establishLink( ep );
219
220 LinkItem item (link, NodeID::UNSPECIFIED, service, receiver);
221 linkMapping.insert( make_pair(link, item) );
222
223 return link;
224}
225
226void BaseOverlay::dropLink(const LinkID& link){
227
228 logging_debug( "baseoverlay dropping link " << link.toString() );
229 LinkMapping::iterator i = linkMapping.find( link );
230
231 if( i == linkMapping.end() ){
232 logging_warn( "can't drop link, mapping unknown " << link.toString() );
233 return;
234 }
235
236 linkMapping.erase( i );
237
238 LinkItem item = i->second;
239 bc.dropLink( link );
240
241 if( item.interface != NULL )
242 item.interface->onLinkDown( spovnetId, nodeId, item.node );
243}
244
245seqnum_t BaseOverlay::sendMessage(const Message* message, const LinkID& link ){
246
247 logging_debug( "baseoverlay is sending message on link " << link.toString() );
248
249 LinkMapping::iterator i = linkMapping.find( link );
250 if( i == linkMapping.end() ){
251 logging_error( "could not send message. link not found " << link.toString() );
252 return -1;
253 }
254
255 OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId );
256 overmsg.encapsulate( const_cast<Message*>(message) );
257
258 i->second.markused();
259 return bc.sendMessage( link, &overmsg );
260}
261
262seqnum_t BaseOverlay::sendMessage(const Message* message, const NodeID& node, const ServiceID& service){
263
264 LinkID link = LinkID::UNSPECIFIED;
265
266 LinkMapping::iterator i = linkMapping.begin();
267 LinkMapping::iterator iend = linkMapping.end();
268
269 for( ; i != iend; i++ ){
270 if( i->second.node == node && i->second.service == service ){
271 link = i->second.link;
272 break;
273 }
274 }
275
276 if( link == LinkID::UNSPECIFIED ){
277
278 logging_info( "no link could be found to send message to node " <<
279 node.toString() << " for service " << service.toString() <<
280 ". creating auto link ...");
281
282 const LinkID link = establishLink( node, service );
283 LinkMapping::iterator i = linkMapping.find( link );
284
285 if( i == linkMapping.end() ){
286 logging_error( "failed to establish auto link to node " << node.toString() <<
287 " for service " << service.toString() );
288 return -1;
289 }
290
291 i->second.autolink = true;
292
293 } // if( link != LinkID::UNSPECIFIED )
294
295 i->second.markused();
296 return sendMessage( message, link );
297}
298
299bool BaseOverlay::bind(ServiceInterface* service, const ServiceID& sid) {
300
301 logging_debug( "binding service " << service << " on serviceid " << sid.toString() );
302
303 if( listenerMux.contains( sid ) ){
304 logging_error( "some service already registered for service id " << sid.toString() );
305 return false;
306 }
307
308 listenerMux.registerItem( service, sid );
309 return true;
310}
311
312ServiceInterface* BaseOverlay::unbind(const ServiceID& sid){
313
314 logging_debug( "unbinding service from serviceid " << sid.toString() );
315
316 if( !listenerMux.contains( sid ) ){
317 logging_warn( "cannot unbind service. no service registered on service id " << sid.toString() );
318 return NULL;
319 }
320
321 ServiceInterface* iface = listenerMux.get( sid );
322 listenerMux.unregisterItem( sid );
323
324 return NULL; //iface;
325}
326
327void BaseOverlay::onLinkUp(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
328
329 logging_debug( "base overlay received linkup event " + id.toString() );
330 // TODO: updateOvlVis( getNodeID(id) );
331
332 //
333 // if we get up a link while we are in the
334 // join phase and this is the link that
335 // we have initiated towards the spovnet owner
336 // continue the join process by sending
337 // a join request message through the link
338 //
339
340 if( state == BaseOverlayStateJoinInitiated && id == initiatorLink){
341
342 logging_info( "join has been initiated by me and the link is now up. " <<
343 "sending out join request for spovnet " <<
344 spovnetId.toString() );
345
346 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeJoinRequest, nodeId );
347 JoinRequest joinmsg( spovnetId, nodeId );
348 overMsg.encapsulate( &joinmsg );
349
350 state = BaseOverlayStateJoinInitiated; // state remains in JoinInitiated
351 bc.sendMessage( id, &overMsg );
352
353 return;
354
355 } // if( state == BaseOverlayStateJoinInitiated && id == initiatorLink)
356
357 //
358 // otherwise this is a link initiated by a service
359 // then we exchange update messages to exchange the
360 // service id and node id for the link. in this case
361 // we should have a link mapping for this link. if
362 // we have no link mapping this link was initiated by
363 // the remote side.
364 //
365
366 LinkMapping::iterator i = linkMapping.find( id );
367
368 if( i == linkMapping.end() ){
369
370 LinkItem item (id, NodeID::UNSPECIFIED, ServiceID::UNSPECIFIED, NULL );
371 linkMapping.insert( make_pair(id, item) );
372
373 } else {
374
375 logging_debug( "sending out OverlayMessageTypeUpdate" <<
376 " for service " << i->second.service.toString() <<
377 " with local node id " << nodeId.toString() <<
378 " on link " << id.toString() );
379
380 OverlayMsg overMsg(
381 OverlayMsg::OverlayMessageTypeUpdate,
382 i->second.service,
383 nodeId
384 );
385
386 bc.sendMessage( id, &overMsg );
387 i->second.markused();
388
389 } // if( i == linkMapping.end() )
390
391 // the link is only valid for the service when we receive
392 // the OverlayMessageTypeUpdate from the remote node and
393 // have the nodeid and serviceid for the link!
394}
395
396void BaseOverlay::onLinkDown(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
397
398 logging_debug( "link went down " << id.toString() );
399
400 //
401 // tell the service that the link went
402 // down and remove the mapping
403 //
404
405 LinkMapping::iterator i = linkMapping.find( id );
406 if( i == linkMapping.end() ) {
407 // this can also be one of the baseoverlay links that
408 // no mapping is stored for. therefore we issue no warning
409 return;
410 }
411
412 if( i->second.interface != NULL )
413 i->second.interface->onLinkDown( id, nodeId, i->second.node );
414
415 linkMapping.erase( i );
416}
417
418void BaseOverlay::onLinkChanged(const LinkID& id, const NetworkLocator* oldlocal, const NetworkLocator* newlocal, const NetworkLocator* oldremote, const NetworkLocator* newremote){
419
420 logging_debug( "link changed " << id.toString() );
421
422 //
423 // tell the service that the link changed
424 //
425
426 LinkMapping::iterator i = linkMapping.find( id );
427 if( i == linkMapping.end() ) return;
428
429 if( i->second.interface != NULL )
430 i->second.interface->onLinkChanged( id, nodeId, i->second.node );
431
432 i->second.markused();
433}
434
435void BaseOverlay::onLinkFail(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
436
437 logging_debug( "link failed " << id.toString() );
438
439 //
440 // tell the service that the link failed
441 //
442
443 LinkMapping::iterator i = linkMapping.find( id );
444 if( i == linkMapping.end() ) return;
445
446 if( i->second.interface != NULL )
447 i->second.interface->onLinkFail( id, nodeId, i->second.node );
448
449 i->second.markused();
450}
451
452void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote, const QoSParameterSet& qos) {
453
454 logging_debug( "link qos changed " << id.toString() );
455
456 //
457 // tell the service that the link qos has changed
458 //
459
460 LinkMapping::iterator i = linkMapping.find( id );
461 if( i == linkMapping.end() ) return;
462
463 if( i->second.interface != NULL )
464 i->second.interface->onLinkQoSChanged( id, nodeId, i->second.node, qos );
465
466 i->second.markused();
467}
468
469bool BaseOverlay::receiveMessage(const Message* message,
470 const LinkID& link, const NodeID& /*the nodeid is invalid in this case! removed var to prevent errors*/ ){
471
472 OverlayMsg* overlayMsg = ((Message*)message)->decapsulate<OverlayMsg>();
473 if( overlayMsg == NULL ) return false;
474
475 // mark the link as in action
476 LinkMapping::iterator item = linkMapping.find( link );
477 if( item != linkMapping.end() ) item->second.markused();
478
479 //
480 // handle user date that we forward to the
481 // appropriate service using the service id
482 // in the message. as we don't know the class
483 // of message that the service handles, we
484 // forward it as a pure Message*
485 //
486
487 if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) {
488
489 logging_debug( "baseoverlay received message of type OverlayMessageTypeData" );
490
491 const ServiceID& service = overlayMsg->getService();
492 ServiceInterface* serviceListener = listenerMux.get( service );
493
494 logging_debug( "received data for service " << service.toString() );
495
496 if( serviceListener != NULL )
497 serviceListener->receiveMessage( overlayMsg, link, overlayMsg->getSourceNode() );
498
499 return true;
500
501 } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) )
502
503 //
504 // handle spovnet instance join requests
505 //
506
507 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) &&
508 state == BaseOverlayStateInitiator){
509
510 logging_debug( "baseoverlay received message of type OverlayMessageTypeJoinRequest" );
511
512 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
513 logging_info( "received join request for spovnet " <<
514 joinReq->getSpoVNetID().toString() );
515
516 //
517 // make sure that the node actually wants to join
518 // the correct spovnet id that we administrate
519 //
520
521 if( joinReq->getSpoVNetID() != spovnetId ){
522 logging_error( "received join request for spovnet we don't handle " <<
523 joinReq->getSpoVNetID().toString() );
524 return false;
525 }
526
527 //
528 // only if all services allow the node to join it is allowed
529 // using the isJoinAllowed interface security policies can be
530 // implemented by higher layer services
531 //
532
533 bool allow = true;
534
535 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
536 allow &= i->isJoinAllowed( overlayMsg->getSourceNode(), spovnetId );
537 }
538
539 logging_info( "sending back join reply for spovnet " <<
540 spovnetId.toString() << " to node " <<
541 overlayMsg->getSourceNode().toString() <<
542 ". result: " << (allow ? "allowed" : "denied") );
543
544 joiningNodes.push_back( overlayMsg->getSourceNode() );
545
546 //
547 // send back our spovnetid, default overlay parameters,
548 // join allow result, and ourself as the endpoint
549 // to bootstrap the overlay against
550 //
551
552 OverlayMsg retmsg( OverlayMsg::OverlayMessageTypeJoinReply, nodeId );
553 JoinReply replyMsg( spovnetId, OverlayParameterSet::DEFAULT,
554 allow, getEndpointDescriptor() );
555
556 retmsg.encapsulate(&replyMsg);
557 bc.sendMessage( link, &retmsg );
558
559 return true;
560
561 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) && state == BaseOverlayStateInitiator)
562
563 //
564 // handle replies to spovnet instance join requests
565 //
566
567 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) &&
568 state == BaseOverlayStateJoinInitiated){
569
570 logging_debug( "baseoverlay received message of type OverlayMessageTypeJoinReply" );
571
572 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
573 logging_info( "received spovnet join reply" );
574
575 //
576 // make sure that we actually wanted to get
577 // into the spovnet whose id is in the message
578 //
579
580 if( replyMsg->getSpoVNetID() != spovnetId ){
581 logging_error( "received spovnet join reply for spovnet " <<
582 replyMsg->getSpoVNetID().toString() <<
583 " but we wanted to join spovnet " <<
584 spovnetId.toString() );
585
586 // state does not change here, maybe
587 // the reply does come in later
588 return false;
589 }
590
591 //
592 // if we did not get access to the spovnet
593 // notify of the failure and
594 // close the link to the initiator
595 //
596
597 if( ! replyMsg->getJoinAllowed() ){
598
599 logging_warn( "our join request has been denied" );
600
601 bc.dropLink( initiatorLink );
602 initiatorLink = LinkID::UNSPECIFIED;
603 state = BaseOverlayStateInvalid;
604
605 // inform all registered services of the event
606 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
607 i->onJoinFail( spovnetId );
608 }
609
610 return true;
611 }
612
613 logging_info( "join request has been accepted for spovnet " << spovnetId.toString() );
614
615 //
616 // if we did get access to the spovnet
617 // we try to create the overlay structure
618 // as given in the reply message
619 //
620
621 overlayInterface = OverlayFactory::create( *this, replyMsg->getParam(), nodeId, this );
622
623 if( overlayInterface == NULL ){
624 logging_error( "overlay structure not supported" );
625
626 bc.dropLink( initiatorLink );
627 initiatorLink = LinkID::UNSPECIFIED;
628 state = BaseOverlayStateInvalid;
629
630 // inform all registered services of the event
631 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
632 i->onJoinFail( spovnetId );
633 }
634
635 return true;
636 }
637
638 //
639 // now start the join process for the overlay.
640 // the join process for the spovnet baseoverlay
641 // is now complete. we use the endpoint for
642 // overlay structure bootstrapping that the
643 // initiator provided in his reply message
644 //
645
646 state = BaseOverlayStateCompleted;
647 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
648
649 overlayInterface->createOverlay();
650 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
651
652 // inform all registered services of the event
653 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
654 i->onJoinSuccess( spovnetId );
655 }
656
657 return true;
658
659 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) && state == BaseOverlayStateJoinInitiated)
660
661
662 //
663 // handle update messages for link establishment
664 //
665
666 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ){
667
668 logging_debug( "baseoverlay received message of type OverlayMessageTypeUpdate" );
669
670 const NodeID& sourcenode = overlayMsg->getSourceNode();
671 const ServiceID& service = overlayMsg->getService();
672
673 //
674 // we should have a linkmapping for the link, otherwise
675 // we ignore update messages
676 //
677
678 LinkMapping::iterator i = linkMapping.find( link );
679 if( i == linkMapping.end() ){
680 logging_warn( "received overlay update message for link " <<
681 link.toString() << " for which we have no mapping" );
682 return false;
683 }
684
685 //
686 // update our link mapping information for this link
687 //
688
689 bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service );
690
691 i->second.node = sourcenode;
692 i->second.service = service;
693
694 //
695 // if our link information changed, we send out an update, too
696 //
697
698 if( changed ){
699 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId );
700 bc.sendMessage( link, &overMsg );
701 }
702
703 //
704 // set the correct listener service for the linkitem
705 // now we can tell the registered service of the linkup event
706 //
707
708 if( !listenerMux.contains( service ) ){
709 logging_warn( "linkup event for service that has not been registered" );
710 return false;
711 }
712
713 ServiceInterface* iface = listenerMux.get( service );
714 if( iface == NULL ){
715 logging_warn( "linkup event for service that has been registered with a NULL interface" );
716 return true;
717 }
718
719 i->second.interface = iface;
720 iface->onLinkUp( link, nodeId, sourcenode );
721 i->second.markused();
722
723 return true ;
724
725 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) )
726
727 //
728 // bye messages to say goodbye
729 //
730
731 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye)){
732
733 logging_debug( "baseoverlay received message of type OverlayMessageTypeBye" );
734
735 logging_debug( "received bye message from " <<
736 overlayMsg->getSourceNode().toString() );
737
738 //
739 // if we are the initiator and receive a bye from a node
740 // the node just left. if we are a node and receive a bye
741 // from the initiator, we have to close, too.
742 //
743
744 if( overlayMsg->getSourceNode() == spovnetInitiator ){
745
746 bc.dropLink( initiatorLink );
747 initiatorLink = LinkID::UNSPECIFIED;
748 state = BaseOverlayStateInvalid;
749
750 logging_fatal( "initiator ended spovnet" );
751
752 // inform all registered services of the event
753 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
754 i->onLeaveFail( spovnetId );
755 }
756
757 } else {
758
759 // a node that said goodbye and we are the initiator
760 // don't have to do much here, as the node also
761 // will go out of the overlay structure
762 logging_info( "node left " << overlayMsg->getSourceNode() );
763
764 // inform all registered services of the event
765 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
766 i->onNodeLeave( overlayMsg->getSourceNode(), spovnetId );
767 }
768 }
769
770 return true;
771
772 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye))
773
774 //
775 // something wrong ...
776 //
777
778 else {
779
780 logging_error( "received message in invalid state! don't know " <<
781 "what to do with this message of type " <<
782 overlayMsg->getType() );
783 return false;
784
785 } // else
786
787 return false;
788}
789
790void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service){
791
792 logging_debug( "broadcasting message to all known nodes " <<
793 "in the overlay from service " + service.toString() );
794
795 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes();
796
797 OverlayInterface::NodeList::iterator i = nodes.begin();
798 OverlayInterface::NodeList::iterator iend = nodes.end();
799
800 for( ; i != iend; i++ ){
801 if( *i == nodeId) continue; // don't send to ourselfs
802 sendMessage( message, *i, service ); // TODO: sollte auto links aufbauen fÃŒr sowas
803 }
804}
805
806vector<NodeID> BaseOverlay::getOverlayNeighbors() const {
807 // the known nodes _can_ also include our
808 // node, so we remove ourselfs
809
810 vector<NodeID> nodes = overlayInterface->getKnownNodes();
811 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
812 if( i != nodes.end() ) nodes.erase( i );
813
814 return nodes;
815}
816
817void BaseOverlay::updateOvlVis( const NodeID& n ) {
818 NodeID node = n;
819/* void visShowNodeBubble (
820 NETWORK_ID network,
821 NodeID& node,
822 string label
823 );
824*/
825 using namespace std;
826
827 if (node == nodeId || node.isUnspecified()) return;
828
829 // min/max
830 if ( node < min || min.isUnspecified() ) min = node;
831 if ( node > max || max.isUnspecified() ) max = node;
832
833 // successor
834 if ( succ.isUnspecified() || (node > nodeId && (succ < nodeId || (node-nodeId) < (succ-nodeId))) ) {
835 if (!succ.isUnspecified() && node != succ)
836 ovl.visDisconnect(ovlId, nodeId, succ, string(""));
837 succ = node;
838 ovl.visConnect(ovlId, nodeId, succ, string(""));
839 }
840
841 // set successor (circle-wrap)
842 if (succ.isUnspecified() && !min.isUnspecified()) {
843 succ = min;
844 ovl.visConnect(ovlId, nodeId, succ, string(""));
845 }
846}
847
848const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
849
850 if( lid == LinkID::UNSPECIFIED ) return nodeId;
851
852 LinkMapping::const_iterator i = linkMapping.find( lid );
853 if( i == linkMapping.end() ) return NodeID::UNSPECIFIED;
854 else return i->second.node;
855}
856
857void BaseOverlay::incomingRouteMessage(Message* msg){
858 // gets handled as normal data message
859 // TODO: passt das so?
860 receiveMessage( msg, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED );
861}
862
863void BaseOverlay::onNodeJoin(const NodeID& node){
864
865 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
866 if( i == joiningNodes.end() ) return;
867
868 logging_info( "node has successfully joined baseoverlay and overlay structure "
869 << node.toString() );
870
871 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
872 i->onNodeJoin( node, spovnetId );
873 }
874
875 joiningNodes.erase( i );
876}
877
878void BaseOverlay::eventFunction(){
879
880 list<LinkID> oldlinks;
881 time_t now = time(NULL);
882
883 // first gather all the links from linkMapping that need droppin
884 // don't directly drop, as the dropLink function affects the
885 // linkMapping structure that we are traversing here.
886 // drop links after a timeout of 30s
887
888 BOOST_FOREACH( LinkPair item, linkMapping ){
889 if( item.second.autolink && difftime(now, item.second.lastuse) > 30)
890 oldlinks.push_back( item.first );
891 }
892
893 BOOST_FOREACH( const LinkID lnk, oldlinks ){
894 logging_debug( "auto-link " << lnk.toString() << " timed out and is getting dropped" );
895 dropLink( lnk );
896 }
897}
898
899}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.