source: source/ariba/overlay/BaseOverlay.cpp@ 3041

Last change on this file since 3041 was 3041, checked in by Christoph Mayer, 15 years ago

-mehrere Fixes, Tickets #25 (bind listeners earlier), #21 (better pingpong), #40 (systemqueue misbehavior)

File size: 26.4 KB
Line 
1// [Licence]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [Licence]
38
39#include "BaseOverlay.h"
40
41#include "ariba/utility/misc/OvlVis.h"
42using ariba::utility::OvlVis;
43
44namespace ariba {
45namespace overlay {
46
47use_logging_cpp(BaseOverlay);
48
49BaseOverlay::BaseOverlay()
50 : bc(NULL), overlayInterface(NULL),
51 nodeId(NodeID::UNSPECIFIED), spovnetId(SpoVNetID::UNSPECIFIED),
52 initiatorLink(LinkID::UNSPECIFIED), state(BaseOverlayStateInvalid){
53}
54
55BaseOverlay::~BaseOverlay(){
56}
57
58void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ){
59
60 bc = &_basecomm;
61 nodeId = _nodeid;
62
63 logging_info("creating base overlay");
64
65 bc->registerMessageReceiver( this );
66 bc->registerEventListener( this );
67
68 ovl.visCreate( ovlId, nodeId, string(""), string("") );
69 ovl.visChangeNodeColor(ovlId, nodeId, OvlVis::NODE_COLORS_GREY);
70
71// if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
72// Identifier(Configuration::instance().read<unsigned long>("SOURCE"))) {
73// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CAMERA);
74// } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
75// Identifier(Configuration::instance().read<unsigned long>("MR_A"))) {
76// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_A);
77// } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==
78// Identifier(Configuration::instance().read<unsigned long>("MR_W"))) {
79// ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_W);
80// }
81
82 // timer for auto link management
83 Timer::setInterval( 5000 );
84 Timer::start();
85}
86
87void BaseOverlay::stop() {
88
89 logging_info("deleting base overlay");
90
91 Timer::stop();
92 bc->unregisterMessageReceiver( this );
93 bc->unregisterEventListener( this );
94}
95
96void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp){
97
98 ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." );
99 logging_info( "starting to join spovnet " << id.toString() << "...");
100
101 //
102 // contact the spovnet initiator and request
103 // to join. if the join is granted we will
104 // receive further information on the structure
105 // of the overlay that is used in the spovnet
106 //
107 // but first, we have to establish a link to the initiator...
108 //
109
110 spovnetId = id;
111 state = BaseOverlayStateJoinInitiated;
112
113 initiatorLink = bc->establishLink( bootstrapEp );
114 logging_info("join process initiated for " << id.toString() << "...");
115}
116
117void BaseOverlay::leaveSpoVNet(){
118
119 logging_info( "leaving spovnet " << spovnetId );
120 bool ret = ( state != this->BaseOverlayStateInvalid );
121
122 logging_debug( "dropping all auto-links ..." );
123
124 BOOST_FOREACH( LinkPair item, linkMapping ){
125 if( item.second.autolink )
126 dropLink( item.first );
127 }
128
129 logging_debug( "leaving overlay" );
130 // first, leave the overlay interface
131 overlayInterface->leaveOverlay();
132
133 if( state != BaseOverlayStateInitiator ){
134
135 // then, leave the spovnet baseoverlay
136 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeBye, nodeId );
137 bc->sendMessage( initiatorLink, &overMsg );
138
139 // drop the link and set to correct state
140 bc->dropLink( initiatorLink );
141 initiatorLink = LinkID::UNSPECIFIED;
142 }
143
144 state = BaseOverlayStateInvalid;
145 ovl.visShutdown( ovlId, nodeId, string("") );
146
147 // inform all registered services of the event
148 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
149 if( ret ) i->onLeaveSuccess( spovnetId );
150 else i->onLeaveFail( spovnetId );
151 }
152}
153
154void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos){
155
156 //
157 // set the state that we are an initiator,
158 // this way incoming messages are handled correctly
159 //
160
161 logging_info("creating spovnet " + id.toString());
162
163 spovnetId = id;
164 state = BaseOverlayStateInitiator;
165
166 overlayInterface = OverlayFactory::create( *this, param, nodeId, this );
167 if( overlayInterface == NULL ){
168 logging_fatal( "overlay structure not supported" );
169 state = BaseOverlayStateInvalid;
170 return;
171 }
172
173 //
174 // bootstrap against ourselfs
175 //
176
177 overlayInterface->joinOverlay();
178 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
179 i->onJoinSuccess( spovnetId );
180 }
181
182 ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA );
183 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
184}
185
186const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const LinkID& link) const {
187
188 return bc->getEndpointDescriptor( link );
189}
190
191const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const NodeID& node) const {
192
193 if( node == nodeId || node == NodeID::UNSPECIFIED )
194 return bc->getEndpointDescriptor();
195
196 if( overlayInterface == NULL ){
197 logging_error( "overlay interface not set, cannot resolve endpoint" );
198 return EndpointDescriptor::UNSPECIFIED;
199 }
200
201 // TODO: if this is not a onehop overlay the operation will go asynchronously
202 return overlayInterface->resolveNode( node );
203}
204
205const LinkID BaseOverlay::establishLink(const NodeID& node, const ServiceID& service){
206
207 // TODO: if this is not a onehop overlay the operation will go asynchronously
208 const EndpointDescriptor& endpoint = overlayInterface->resolveNode( node );
209 if( endpoint == EndpointDescriptor::UNSPECIFIED ){
210 logging_error( "could not resolve node to endpoint. unable to establish link" );
211 return LinkID::UNSPECIFIED;
212 }
213
214 logging_debug( "baseoverlay called to establish link between node " <<
215 node.toString() << " on endpoint " << endpoint.toString() <<
216 " for service " << service.toString() );
217
218 return establishLink( endpoint, service );
219}
220
221const LinkID BaseOverlay::establishLink(const EndpointDescriptor& ep, const ServiceID& service){
222
223 if( !listenerMux.contains( service ) ){
224 logging_error( "no registered listener on serviceid " << service.toString() );
225 return LinkID::UNSPECIFIED;
226 }
227
228 ServiceInterface* receiver = listenerMux.get( service );
229 const LinkID link = bc->establishLink( ep );
230
231 LinkItem item (link, NodeID::UNSPECIFIED, service, receiver);
232 linkMapping.insert( make_pair(link, item) );
233
234 return link;
235}
236
237void BaseOverlay::dropLink(const LinkID& link){
238
239 logging_debug( "baseoverlay dropping link " << link.toString() );
240 LinkMapping::iterator i = linkMapping.find( link );
241
242 if( i == linkMapping.end() ){
243 logging_warn( "can't drop link, mapping unknown " << link.toString() );
244 return;
245 }
246
247 linkMapping.erase( i );
248
249 LinkItem item = i->second;
250 bc->dropLink( link );
251
252 if( item.interface != NULL )
253 item.interface->onLinkDown( spovnetId, nodeId, item.node );
254}
255
256seqnum_t BaseOverlay::sendMessage(const Message* message, const LinkID& link ){
257
258 logging_debug( "baseoverlay is sending message on link " << link.toString() );
259
260 LinkMapping::iterator i = linkMapping.find( link );
261 if( i == linkMapping.end() ){
262 logging_error( "could not send message. link not found " << link.toString() );
263 return -1;
264 }
265
266 OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId );
267 overmsg.encapsulate( const_cast<Message*>(message) );
268
269 i->second.markused();
270 return bc->sendMessage( link, &overmsg );
271}
272
273seqnum_t BaseOverlay::sendMessage(const Message* message, const NodeID& node, const ServiceID& service){
274
275 LinkID link = LinkID::UNSPECIFIED;
276
277 LinkMapping::iterator i = linkMapping.begin();
278 LinkMapping::iterator iend = linkMapping.end();
279
280 for( ; i != iend; i++ ){
281 if( i->second.node == node && i->second.service == service ){
282 link = i->second.link;
283 break;
284 }
285 }
286
287 if( link == LinkID::UNSPECIFIED ){
288
289 logging_info( "no link could be found to send message to node " <<
290 node.toString() << " for service " << service.toString() <<
291 ". creating auto link ...");
292
293 const LinkID link = establishLink( node, service );
294 LinkMapping::iterator i = linkMapping.find( link );
295
296 if( i == linkMapping.end() ){
297 logging_error( "failed to establish auto link to node " << node.toString() <<
298 " for service " << service.toString() );
299 return -1;
300 }
301
302 i->second.autolink = true;
303
304 } // if( link != LinkID::UNSPECIFIED )
305
306 i->second.markused();
307 return sendMessage( message, link );
308}
309
310bool BaseOverlay::bind(ServiceInterface* service, const ServiceID& sid) {
311
312 logging_debug( "binding service " << service << " on serviceid " << sid.toString() );
313
314 if( listenerMux.contains( sid ) ){
315 logging_error( "some service already registered for service id " << sid.toString() );
316 return false;
317 }
318
319 listenerMux.registerItem( service, sid );
320 return true;
321}
322
323ServiceInterface* BaseOverlay::unbind(const ServiceID& sid){
324
325 logging_debug( "unbinding service from serviceid " << sid.toString() );
326
327 if( !listenerMux.contains( sid ) ){
328 logging_warn( "cannot unbind service. no service registered on service id " << sid.toString() );
329 return NULL;
330 }
331
332 ServiceInterface* iface = listenerMux.get( sid );
333 listenerMux.unregisterItem( sid );
334
335 return NULL; //iface;
336}
337
338void BaseOverlay::onLinkUp(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
339
340 logging_debug( "base overlay received linkup event " + id.toString() );
341 // TODO: updateOvlVis( getNodeID(id) );
342
343 //
344 // if we get up a link while we are in the
345 // join phase and this is the link that
346 // we have initiated towards the spovnet owner
347 // continue the join process by sending
348 // a join request message through the link
349 //
350
351 if( state == BaseOverlayStateJoinInitiated && id == initiatorLink){
352
353 logging_info( "join has been initiated by me and the link is now up. " <<
354 "sending out join request for spovnet " <<
355 spovnetId.toString() );
356
357 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeJoinRequest, nodeId );
358 JoinRequest joinmsg( spovnetId, nodeId );
359 overMsg.encapsulate( &joinmsg );
360
361 state = BaseOverlayStateJoinInitiated; // state remains in JoinInitiated
362 bc->sendMessage( id, &overMsg );
363
364 return;
365
366 } // if( state == BaseOverlayStateJoinInitiated && id == initiatorLink)
367
368 //
369 // otherwise this is a link initiated by a service
370 // then we exchange update messages to exchange the
371 // service id and node id for the link. in this case
372 // we should have a link mapping for this link. if
373 // we have no link mapping this link was initiated by
374 // the remote side.
375 //
376
377 LinkMapping::iterator i = linkMapping.find( id );
378
379 if( i == linkMapping.end() ){
380
381 LinkItem item (id, NodeID::UNSPECIFIED, ServiceID::UNSPECIFIED, NULL );
382 linkMapping.insert( make_pair(id, item) );
383
384 } else {
385
386 logging_debug( "sending out OverlayMessageTypeUpdate" <<
387 " for service " << i->second.service.toString() <<
388 " with local node id " << nodeId.toString() <<
389 " on link " << id.toString() );
390
391 OverlayMsg overMsg(
392 OverlayMsg::OverlayMessageTypeUpdate,
393 i->second.service,
394 nodeId
395 );
396
397 bc->sendMessage( id, &overMsg );
398 i->second.markused();
399
400 } // if( i == linkMapping.end() )
401
402 // the link is only valid for the service when we receive
403 // the OverlayMessageTypeUpdate from the remote node and
404 // have the nodeid and serviceid for the link!
405}
406
407void BaseOverlay::onLinkDown(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
408
409 logging_debug( "link went down " << id.toString() );
410
411 //
412 // tell the service that the link went
413 // down and remove the mapping
414 //
415
416 LinkMapping::iterator i = linkMapping.find( id );
417 if( i == linkMapping.end() ) {
418 // this can also be one of the baseoverlay links that
419 // no mapping is stored for. therefore we issue no warning
420 return;
421 }
422
423 if( i->second.interface != NULL )
424 i->second.interface->onLinkDown( id, nodeId, i->second.node );
425
426 linkMapping.erase( i );
427}
428
429void BaseOverlay::onLinkChanged(const LinkID& id, const NetworkLocator* oldlocal, const NetworkLocator* newlocal, const NetworkLocator* oldremote, const NetworkLocator* newremote){
430
431 logging_debug( "link changed " << id.toString() );
432
433 //
434 // tell the service that the link changed
435 //
436
437 LinkMapping::iterator i = linkMapping.find( id );
438 if( i == linkMapping.end() ) return;
439
440 if( i->second.interface != NULL )
441 i->second.interface->onLinkChanged( id, nodeId, i->second.node );
442
443 i->second.markused();
444}
445
446void BaseOverlay::onLinkFail(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){
447
448 logging_debug( "link failed " << id.toString() );
449
450 //
451 // tell the service that the link failed
452 //
453
454 LinkMapping::iterator i = linkMapping.find( id );
455 if( i == linkMapping.end() ) return;
456
457 if( i->second.interface != NULL )
458 i->second.interface->onLinkFail( id, nodeId, i->second.node );
459
460 i->second.markused();
461}
462
463void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote, const QoSParameterSet& qos) {
464
465 logging_debug( "link qos changed " << id.toString() );
466
467 //
468 // tell the service that the link qos has changed
469 //
470
471 LinkMapping::iterator i = linkMapping.find( id );
472 if( i == linkMapping.end() ) return;
473
474 if( i->second.interface != NULL )
475 i->second.interface->onLinkQoSChanged( id, nodeId, i->second.node, qos );
476
477 i->second.markused();
478}
479
480bool BaseOverlay::receiveMessage(const Message* message,
481 const LinkID& link, const NodeID& /*the nodeid is invalid in this case! removed var to prevent errors*/ ){
482
483 OverlayMsg* overlayMsg = ((Message*)message)->decapsulate<OverlayMsg>();
484 if( overlayMsg == NULL ) return false;
485
486 // mark the link as in action
487 LinkMapping::iterator item = linkMapping.find( link );
488 if( item != linkMapping.end() ) item->second.markused();
489
490 //
491 // handle user date that we forward to the
492 // appropriate service using the service id
493 // in the message. as we don't know the class
494 // of message that the service handles, we
495 // forward it as a pure Message*
496 //
497
498 if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) {
499
500 logging_debug( "baseoverlay received message of type OverlayMessageTypeData" );
501
502 const ServiceID& service = overlayMsg->getService();
503 ServiceInterface* serviceListener = listenerMux.get( service );
504
505 logging_debug( "received data for service " << service.toString() );
506
507 if( serviceListener != NULL )
508 serviceListener->receiveMessage( overlayMsg, link, overlayMsg->getSourceNode() );
509
510 return true;
511
512 } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) )
513
514 //
515 // handle spovnet instance join requests
516 //
517
518 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) &&
519 state == BaseOverlayStateInitiator){
520
521 logging_debug( "baseoverlay received message of type OverlayMessageTypeJoinRequest" );
522
523 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>();
524 logging_info( "received join request for spovnet " <<
525 joinReq->getSpoVNetID().toString() );
526
527 //
528 // make sure that the node actually wants to join
529 // the correct spovnet id that we administrate
530 //
531
532 if( joinReq->getSpoVNetID() != spovnetId ){
533 logging_error( "received join request for spovnet we don't handle " <<
534 joinReq->getSpoVNetID().toString() );
535 return false;
536 }
537
538 //
539 // only if all services allow the node to join it is allowed
540 // using the isJoinAllowed interface security policies can be
541 // implemented by higher layer services
542 //
543
544 bool allow = true;
545
546 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
547 allow &= i->isJoinAllowed( overlayMsg->getSourceNode(), spovnetId );
548 }
549
550 logging_info( "sending back join reply for spovnet " <<
551 spovnetId.toString() << " to node " <<
552 overlayMsg->getSourceNode().toString() <<
553 ". result: " << (allow ? "allowed" : "denied") );
554
555 joiningNodes.push_back( overlayMsg->getSourceNode() );
556
557 //
558 // send back our spovnetid, default overlay parameters,
559 // join allow result, and ourself as the endpoint
560 // to bootstrap the overlay against
561 //
562
563 OverlayMsg retmsg( OverlayMsg::OverlayMessageTypeJoinReply, nodeId );
564 JoinReply replyMsg( spovnetId, OverlayParameterSet::DEFAULT,
565 allow, getEndpointDescriptor() );
566
567 retmsg.encapsulate(&replyMsg);
568 bc->sendMessage( link, &retmsg );
569
570 return true;
571
572 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) && state == BaseOverlayStateInitiator)
573
574 //
575 // handle replies to spovnet instance join requests
576 //
577
578 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) &&
579 state == BaseOverlayStateJoinInitiated){
580
581 logging_debug( "baseoverlay received message of type OverlayMessageTypeJoinReply" );
582
583 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>();
584 logging_info( "received spovnet join reply" );
585
586 //
587 // make sure that we actually wanted to get
588 // into the spovnet whose id is in the message
589 //
590
591 if( replyMsg->getSpoVNetID() != spovnetId ){
592 logging_error( "received spovnet join reply for spovnet " <<
593 replyMsg->getSpoVNetID().toString() <<
594 " but we wanted to join spovnet " <<
595 spovnetId.toString() );
596
597 // state does not change here, maybe
598 // the reply does come in later
599 return false;
600 }
601
602 //
603 // if we did not get access to the spovnet
604 // notify of the failure and
605 // close the link to the initiator
606 //
607
608 if( ! replyMsg->getJoinAllowed() ){
609
610 logging_error( "our join request has been denied" );
611
612 bc->dropLink( initiatorLink );
613 initiatorLink = LinkID::UNSPECIFIED;
614 state = BaseOverlayStateInvalid;
615
616 // inform all registered services of the event
617 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
618 i->onJoinFail( spovnetId );
619 }
620
621 return true;
622 }
623
624 logging_info( "join request has been accepted for spovnet " << spovnetId.toString() );
625
626 //
627 // if we did get access to the spovnet
628 // we try to create the overlay structure
629 // as given in the reply message
630 //
631
632 overlayInterface = OverlayFactory::create( *this, replyMsg->getParam(), nodeId, this );
633
634 if( overlayInterface == NULL ){
635 logging_error( "overlay structure not supported" );
636
637 bc->dropLink( initiatorLink );
638 initiatorLink = LinkID::UNSPECIFIED;
639 state = BaseOverlayStateInvalid;
640
641 // inform all registered services of the event
642 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
643 i->onJoinFail( spovnetId );
644 }
645
646 return true;
647 }
648
649 //
650 // now start the join process for the overlay.
651 // the join process for the spovnet baseoverlay
652 // is now complete. we use the endpoint for
653 // overlay structure bootstrapping that the
654 // initiator provided in his reply message
655 //
656
657 state = BaseOverlayStateCompleted;
658 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN);
659
660 overlayInterface->createOverlay();
661 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() );
662
663 // inform all registered services of the event
664 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
665 i->onJoinSuccess( spovnetId );
666 }
667
668 return true;
669
670 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) && state == BaseOverlayStateJoinInitiated)
671
672
673 //
674 // handle update messages for link establishment
675 //
676
677 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ){
678
679 logging_debug( "baseoverlay received message of type OverlayMessageTypeUpdate" );
680
681 const NodeID& sourcenode = overlayMsg->getSourceNode();
682 const ServiceID& service = overlayMsg->getService();
683
684 //
685 // we should have a linkmapping for the link, otherwise
686 // we ignore update messages
687 //
688
689 LinkMapping::iterator i = linkMapping.find( link );
690 if( i == linkMapping.end() ){
691 logging_warn( "received overlay update message for link " <<
692 link.toString() << " for which we have no mapping" );
693 return false;
694 }
695
696 //
697 // update our link mapping information for this link
698 //
699
700 bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service );
701
702 i->second.node = sourcenode;
703 i->second.service = service;
704
705 //
706 // if our link information changed, we send out an update, too
707 //
708
709 if( changed ){
710 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId );
711 bc->sendMessage( link, &overMsg );
712 }
713
714 //
715 // set the correct listener service for the linkitem
716 // now we can tell the registered service of the linkup event
717 //
718
719 if( !listenerMux.contains( service ) ){
720 logging_warn( "linkup event for service that has not been registered" );
721 return false;
722 }
723
724 ServiceInterface* iface = listenerMux.get( service );
725 if( iface == NULL ){
726 logging_warn( "linkup event for service that has been registered with a NULL interface" );
727 return true;
728 }
729
730 i->second.interface = iface;
731 iface->onLinkUp( link, nodeId, sourcenode );
732 i->second.markused();
733
734 return true ;
735
736 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) )
737
738 //
739 // bye messages to say goodbye
740 //
741
742 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye)){
743
744 logging_debug( "baseoverlay received message of type OverlayMessageTypeBye" );
745
746 logging_debug( "received bye message from " <<
747 overlayMsg->getSourceNode().toString() );
748
749 //
750 // if we are the initiator and receive a bye from a node
751 // the node just left. if we are a node and receive a bye
752 // from the initiator, we have to close, too.
753 //
754
755 if( overlayMsg->getSourceNode() == spovnetInitiator ){
756
757 bc->dropLink( initiatorLink );
758 initiatorLink = LinkID::UNSPECIFIED;
759 state = BaseOverlayStateInvalid;
760
761 logging_fatal( "initiator ended spovnet" );
762
763 // inform all registered services of the event
764 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
765 i->onLeaveFail( spovnetId );
766 }
767
768 } else {
769
770 // a node that said goodbye and we are the initiator
771 // don't have to do much here, as the node also
772 // will go out of the overlay structure
773 logging_info( "node left " << overlayMsg->getSourceNode() );
774
775 // inform all registered services of the event
776 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
777 i->onNodeLeave( overlayMsg->getSourceNode(), spovnetId );
778 }
779 }
780
781 return true;
782
783 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye))
784
785 //
786 // something wrong ...
787 //
788
789 else {
790
791 logging_error( "received message in invalid state! don't know " <<
792 "what to do with this message of type " <<
793 overlayMsg->getType() );
794 return false;
795
796 } // else
797
798 return false;
799}
800
801void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service){
802
803 logging_debug( "broadcasting message to all known nodes " <<
804 "in the overlay from service " + service.toString() );
805
806 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes();
807
808 OverlayInterface::NodeList::iterator i = nodes.begin();
809 OverlayInterface::NodeList::iterator iend = nodes.end();
810
811 for( ; i != iend; i++ ){
812 if( *i == nodeId) continue; // don't send to ourselfs
813 sendMessage( message, *i, service );
814 }
815}
816
817vector<NodeID> BaseOverlay::getOverlayNeighbors() const {
818 // the known nodes _can_ also include our
819 // node, so we remove ourselfs
820
821 vector<NodeID> nodes = overlayInterface->getKnownNodes();
822 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId );
823 if( i != nodes.end() ) nodes.erase( i );
824
825 return nodes;
826}
827
828void BaseOverlay::updateOvlVis( const NodeID& n ) {
829 NodeID node = n;
830/* void visShowNodeBubble (
831 NETWORK_ID network,
832 NodeID& node,
833 string label
834 );
835*/
836 using namespace std;
837
838 if (node == nodeId || node.isUnspecified()) return;
839
840 // min/max
841 if ( node < min || min.isUnspecified() ) min = node;
842 if ( node > max || max.isUnspecified() ) max = node;
843
844 // successor
845 if ( succ.isUnspecified() || (node > nodeId && (succ < nodeId || (node-nodeId) < (succ-nodeId))) ) {
846 if (!succ.isUnspecified() && node != succ)
847 ovl.visDisconnect(ovlId, nodeId, succ, string(""));
848 succ = node;
849 ovl.visConnect(ovlId, nodeId, succ, string(""));
850 }
851
852 // set successor (circle-wrap)
853 if (succ.isUnspecified() && !min.isUnspecified()) {
854 succ = min;
855 ovl.visConnect(ovlId, nodeId, succ, string(""));
856 }
857}
858
859const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const {
860
861 if( lid == LinkID::UNSPECIFIED ) return nodeId;
862
863 LinkMapping::const_iterator i = linkMapping.find( lid );
864 if( i == linkMapping.end() ) return NodeID::UNSPECIFIED;
865 else return i->second.node;
866}
867
868void BaseOverlay::incomingRouteMessage(Message* msg){
869 // gets handled as normal data message
870 receiveMessage( msg, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED );
871}
872
873void BaseOverlay::onNodeJoin(const NodeID& node){
874
875 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node );
876 if( i == joiningNodes.end() ) return;
877
878 logging_info( "node has successfully joined baseoverlay and overlay structure "
879 << node.toString() );
880
881 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){
882 i->onNodeJoin( node, spovnetId );
883 }
884
885 joiningNodes.erase( i );
886}
887
888void BaseOverlay::eventFunction(){
889
890 list<LinkID> oldlinks;
891 time_t now = time(NULL);
892
893 // first gather all the links from linkMapping that need droppin
894 // don't directly drop, as the dropLink function affects the
895 // linkMapping structure that we are traversing here.
896 // drop links after a timeout of 30s
897
898 BOOST_FOREACH( LinkPair item, linkMapping ){
899 if( item.second.autolink && difftime(now, item.second.lastuse) > 30)
900 oldlinks.push_back( item.first );
901 }
902
903 BOOST_FOREACH( const LinkID lnk, oldlinks ){
904 logging_debug( "auto-link " << lnk.toString() << " timed out and is getting dropped" );
905 dropLink( lnk );
906 }
907}
908
909}} // namespace ariba, overlay
Note: See TracBrowser for help on using the repository browser.