Changeset 2483
- Timestamp:
- Feb 24, 2009, 10:06:43 PM (16 years ago)
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
etc/pingpong/settings_initiator.cnf
r2454 r2483 4 4 ariba.tcp.port = 5002 5 5 #ariba.udp.port = 6 ariba.bootstrap.hints=pingpong{ip{127.0.0.1},tcp(ip,{5002})}6 #ariba.bootstrap.hints=pingpong{ip{127.0.0.1},tcp(ip,{5002})} -
etc/pingpong/settings_node2.cnf
r2454 r2483 2 2 node.initiator = false 3 3 #ariba.ip.addr = 4 ariba.tcp.port = 50 104 ariba.tcp.port = 5004 5 5 #ariba.udp.port = 6 6 ariba.bootstrap.hints=pingpong{ip{127.0.0.1},tcp(ip,{5002})} -
sample/pingpong/PingPong.cpp
r2473 r2483 17 17 // construction 18 18 PingPong::PingPong() : pingId( 0 ) { 19 Timer::setInterval( 5000 ); 19 20 } 20 21 … … 70 71 71 72 // bind communication and node listener 72 node->bind( this);73 node->bind( this, PingPong::PINGPONG_ID);73 node->bind( this ); 74 node->bind( this, PingPong::PINGPONG_ID); 74 75 76 // start the ping timer. if we are not 77 // the initiator this will happen in onJoinCompleted 78 if( isInitiator ) Timer::start(); 79 75 80 // ping pong started up... 76 81 logging_info( "pingpong started up "); … … 85 90 Timer::stop(); 86 91 87 // unbind listeners92 // unbind communication and node listener 88 93 node->unbind( this ); 89 94 node->unbind( this, PingPong::PINGPONG_ID ); … … 108 113 109 114 // start the timer to ping every second 110 Timer::setInterval( 1000 );111 115 Timer::start(); 112 116 } … … 117 121 118 122 // communication listener 119 bool PingPong::onLinkRequest(const NodeID& remote, Message*msg) {123 bool PingPong::onLinkRequest(const NodeID& remote, const DataMessage& msg) { 120 124 return false; 121 125 } 122 126 123 void PingPong::onMessage(Message* msg, const NodeID& remote, 124 const LinkID& lnk = LinkID::UNSPECIFIED) { 127 void PingPong::onMessage(const DataMessage& msg, const NodeID& remote, const LinkID& lnk) { 125 128 126 PingPongMessage* pingmsg = msg ->decapsulate<PingPongMessage> ();129 PingPongMessage* pingmsg = msg.getMessage()->decapsulate<PingPongMessage> (); 127 130 128 131 logging_info( "received ping message on link " << lnk.toString() 129 132 << " from node " << remote.toString() 130 << ": " << pingmsg-> toString() );133 << ": " << pingmsg->info() ); 131 134 } 132 135 … … 142 145 PingPongMessage pingmsg( pingId ); 143 146 node->sendBroadcastMessage( pingmsg, PingPong::PINGPONG_ID ); 144 145 147 } 146 148 -
sample/pingpong/PingPong.h
r2473 r2483 35 35 protected: 36 36 // communication listener interface 37 virtual bool onLinkRequest(const NodeID& remote, Message*msg);38 virtual void onMessage( Message* msg, const NodeID& remote, const LinkID& lnk);37 virtual bool onLinkRequest(const NodeID& remote, const DataMessage& msg); 38 virtual void onMessage(const DataMessage& msg, const NodeID& remote, const LinkID& lnk= LinkID::UNSPECIFIED); 39 39 40 40 // node listener interface … … 62 62 // the current ping id 63 63 unsigned long pingId; 64 64 65 }; 65 66 -
source/ariba/communication/BaseCommunication.cpp
r2482 r2483 85 85 86 86 for( ; i != iend; i++){ 87 logging_debug( "local locator found " +(*i)->toString() );87 logging_debug( "local locator found " << (*i)->toString() ); 88 88 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i); 89 89 … … 101 101 localDescriptor.locator = ipv4locator; 102 102 localDescriptor.isUnspec = false; 103 logging_info( "binding to addr = " +ipv4locator->toString() );103 logging_info( "binding to addr = " << ipv4locator->toString() ); 104 104 foundLocator = true; 105 105 break; … … 180 180 LinkID linkid = LinkID::create(); 181 181 182 logging_debug( "creating new local descriptor entry with local link id " +linkid.toString() );183 LinkDescriptor linkDescriptor( linkid, local, LinkID::UNSPECIFIED, remote, descriptor );182 logging_debug( "creating new local descriptor entry with local link id " << linkid.toString() ); 183 LinkDescriptor linkDescriptor( linkid, local, LinkID::UNSPECIFIED, remote, descriptor, false ); 184 184 addLink( linkDescriptor ); 185 185 … … 189 189 // 190 190 191 logging_debug( "sending out base messages with request to open link to " +remote->toString() );191 logging_debug( "sending out base messages with request to open link to " << remote->toString() ); 192 192 AribaBaseMsg baseMsg( remote, AribaBaseMsg::LINK_STATE_OPEN_REQUEST, linkid, 193 193 LinkID::UNSPECIFIED ); … … 206 206 logging_error( "don't know the link you want to drop" ); 207 207 return; 208 } 209 210 // warn if this link has some queued messages attached 211 if( descriptor.waitingmsg.size() > 0 ){ 212 logging_warn( "dropping link " << link.toString() << 213 " that has " << descriptor.waitingmsg.size() << " waiting messages" ); 208 214 } 209 215 … … 231 237 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) { 232 238 233 logging_debug( "sending out message to link " +lid.toString() );239 logging_debug( "sending out message to link " << lid.toString() ); 234 240 235 241 // query local link info 236 242 LinkDescriptor& linkDesc = queryLocalLink(lid); 237 243 if( linkDesc.isUnspecified() ){ 238 logging_error( "don't know the link with id " +lid.toString() );239 return 0;244 logging_error( "don't know the link with id " << lid.toString() ); 245 return -1; 240 246 } 241 247 … … 251 257 msg.encapsulate( const_cast<Message*>(message) ); 252 258 253 // send message 254 transport->sendMessage( &msg ); 255 256 return ++currentSeqnum; 259 if( linkDesc.linkup ){ 260 261 // send message 262 transport->sendMessage( &msg ); 263 return ++currentSeqnum; 264 265 } else { 266 267 // queue message 268 logging_info( "link " << lid.toString() << " is not up yet, queueing message" ); 269 linkDesc.waitingmsg.push_back( new Message(msg) ); // TODO ooooo 270 271 return 0; 272 273 } // if( linkDesc.linkup ) 257 274 } 258 275 … … 300 317 301 318 AribaBaseMsg* spovmsg = ((Message*)message)->decapsulate<AribaBaseMsg>(); 302 logging_debug( "receiving base comm message of type " +spovmsg->getTypeString() );319 logging_debug( "receiving base comm message of type " << spovmsg->getTypeString() ); 303 320 304 321 // … … 351 368 const NetworkLocator* remoteLocator = dynamic_cast<const NetworkLocator*>(message->getSourceAddress()); 352 369 353 logging_debug( "localLocator=" + localLocator->toString() + " remoteLocator="+remoteLocator->toString()); 370 logging_debug( "localLocator=" << localLocator->toString() 371 << " remoteLocator=" << remoteLocator->toString()); 354 372 355 373 // ask the registered listeners if this link … … 372 390 373 391 LinkDescriptor linkDescriptor(localLink, localLocator, remoteLink, 374 remoteLocator, EndpointDescriptor(remoteLocator) );392 remoteLocator, EndpointDescriptor(remoteLocator), true); 375 393 376 394 logging_debug( "saving new link descriptor with " << … … 378 396 "[local locator " << localLocator->toString() << "] " << 379 397 "[remote link " << remoteLink.toString() << "] " << 380 "[remote locator " << remoteLocator->toString() << "]" ); 398 "[remote locator " << remoteLocator->toString() << "]" << 399 "[link up true]" ); 381 400 382 401 addLink( linkDescriptor ); … … 400 419 // the link is now open 401 420 // 402 421 403 422 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 404 423 i->onLinkUp( localLink, localLocator, remoteLocator ); … … 420 439 421 440 if (linkDesc.isUnspecified()) { 422 logging_warn("Failed to find local link " +spovmsg->getRemoteLink().toString());441 logging_warn("Failed to find local link " << spovmsg->getRemoteLink().toString()); 423 442 return false; 424 443 } 425 444 426 445 linkDesc.remoteLink = spovmsg->getLocalLink(); 427 logging_debug( "the link is now up with local link id " + spovmsg->getRemoteLink().toString() ); 446 linkDesc.linkup = true; 447 448 logging_debug( "the link is now up with local link id " << spovmsg->getRemoteLink().toString() ); 428 449 429 450 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ … … 431 452 } 432 453 454 if( linkDesc.waitingmsg.size() > 0 ){ 455 logging_info( "sending out queued messages on link " << linkDesc.localLink.toString() ); 456 457 BOOST_FOREACH( Message* msg, linkDesc.waitingmsg ){ 458 sendMessage( linkDesc.localLink, msg ); 459 delete msg; 460 } 461 462 linkDesc.waitingmsg.clear(); 463 } 464 433 465 } // LINK_STATE_OPEN_REPLY 434 466 … … 440 472 441 473 const LinkID& localLink = spovmsg->getRemoteLink(); 442 logging_debug( "received link close request for link " +localLink.toString() );474 logging_debug( "received link close request for link " << localLink.toString() ); 443 475 444 476 // … … 450 482 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 451 483 if (linkDesc.isUnspecified()) { 452 logging_warn("Failed to find local link " +localLink.toString());484 logging_warn("Failed to find local link " << localLink.toString()); 453 485 return false; 454 486 } … … 469 501 470 502 const LinkID& localLink = spovmsg->getRemoteLink(); 471 logging_debug( "received link update for link " +localLink.toString() );503 logging_debug( "received link update for link " << localLink.toString() ); 472 504 473 505 // … … 477 509 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 478 510 if (linkDesc.isUnspecified()) { 479 logging_warn("Failed to update local link " +localLink.toString());511 logging_warn("Failed to update local link " << localLink.toString()); 480 512 return false; 481 513 } … … 517 549 518 550 for( ; i != iend; i++){ 519 if( (*i).localLink == localLink){ 520 linkSet.erase( i ); 521 break; 522 } 551 if( (*i).localLink != localLink) continue; 552 553 BOOST_FOREACH( Message* msg, i->waitingmsg ){ 554 delete msg; 555 } 556 557 i->waitingmsg.clear(); 558 linkSet.erase( i ); 559 560 break; 523 561 } 524 562 } … … 568 606 info.type != NetworkChangeInterface::EventTypeAddressDelete ) return; 569 607 570 logging_info( "base communication is handling network address changes");608 logging_info( "base communication is handling network address changes" ); 571 609 572 610 // … … 610 648 611 649 for( ; i != iend; i++){ 612 logging_debug( "local locator found " +(*i)->toString() );650 logging_debug( "local locator found " << (*i)->toString() ); 613 651 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i); 614 652 -
source/ariba/communication/BaseCommunication.h
r2482 r2483 46 46 #include <vector> 47 47 #include <iostream> 48 #include <deque> 48 49 #include <algorithm> 49 50 #include <boost/foreach.hpp> … … 77 78 using std::map; 78 79 using std::vector; 80 using std::deque; 79 81 using std::pair; 80 82 using std::make_pair; … … 222 224 remoteLink(), 223 225 remoteLocator(NULL), 224 remoteEndpoint(EndpointDescriptor::UNSPECIFIED){ 226 remoteEndpoint(EndpointDescriptor::UNSPECIFIED), 227 linkup(false) { 225 228 } 226 229 227 230 LinkDescriptor(const LinkID& _localLink, const NetworkLocator*& _localLocator, 228 231 const LinkID& _remoteLink, const NetworkLocator*& _remoteLocator, 229 const EndpointDescriptor& _remoteEndpoint ) :232 const EndpointDescriptor& _remoteEndpoint, bool _linkup ) : 230 233 localLink(_localLink), 231 234 localLocator(_localLocator), 232 235 remoteLink(_remoteLink), 233 236 remoteLocator(_remoteLocator), 234 remoteEndpoint(_remoteEndpoint){ 237 remoteEndpoint(_remoteEndpoint), 238 linkup(_linkup) { 235 239 } 236 240 … … 240 244 remoteLink(desc.remoteLink), 241 245 remoteLocator(desc.remoteLocator), 242 remoteEndpoint(desc.remoteEndpoint){ 246 remoteEndpoint(desc.remoteEndpoint), 247 linkup(desc.linkup) { 248 249 BOOST_FOREACH( Message* msg, desc.waitingmsg ){ 250 waitingmsg.push_back( msg ); 251 } 243 252 } 244 253 … … 252 261 const NetworkLocator* remoteLocator; 253 262 EndpointDescriptor remoteEndpoint; 263 264 bool linkup; 265 deque<Message*> waitingmsg; 254 266 }; 255 267 -
source/ariba/overlay/BaseOverlay.cpp
r2473 r2483 69 69 // ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_W); 70 70 // } 71 72 // timer for auto link management 73 Timer::setInterval( 5000 ); 74 Timer::start(); 71 75 } 72 76 … … 75 79 logging_info("deleting base overlay"); 76 80 81 Timer::stop(); 77 82 bc.unregisterMessageReceiver( this ); 78 83 bc.unregisterEventListener( this ); … … 147 152 // 148 153 // create the overlay 149 // and bootstrap against ourselfs150 154 // 151 155 152 156 overlayInterface->createOverlay(); 157 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){ 158 i->onOverlayCreate( spovnetId ); 159 } 160 161 // 162 // bootstrap against ourselfs 163 // 164 153 165 overlayInterface->joinOverlay(); 166 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){ 167 i->onJoinSuccess( spovnetId ); 168 } 154 169 155 170 ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA ); 156 171 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); 157 158 // inform all registered services of the event159 BOOST_FOREACH( ServiceInterface* i, listenerMux.getOneList() ){160 i->onOverlayCreate( spovnetId );161 }162 172 } 163 173 … … 245 255 overmsg.encapsulate( const_cast<Message*>(message) ); 246 256 257 i->second.markused(); 247 258 return bc.sendMessage( link, &overmsg ); 248 259 } … … 263 274 264 275 if( link == LinkID::UNSPECIFIED ){ 265 logging_error( "no link could be found to send message to node " << 266 node.toString() << " for service " << service.toString() ); 267 return -1; 268 } 269 276 277 logging_info( "no link could be found to send message to node " << 278 node.toString() << " for service " << service.toString() << 279 ". creating auto link ..."); 280 281 const LinkID link = establishLink( node, service ); 282 LinkMapping::iterator i = linkMapping.find( link ); 283 284 if( i == linkMapping.end() ){ 285 logging_error( "failed to establish auto link to node " << node.toString() << 286 " for service " << service.toString() ); 287 return -1; 288 } 289 290 i->second.autolink = true; 291 292 } // if( link != LinkID::UNSPECIFIED ) 293 294 i->second.markused(); 270 295 return sendMessage( message, link ); 271 296 } … … 352 377 " on link " << id.toString() ); 353 378 354 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId ); 379 OverlayMsg overMsg( 380 OverlayMsg::OverlayMessageTypeUpdate, 381 i->second.service, 382 nodeId 383 ); 384 355 385 bc.sendMessage( id, &overMsg ); 386 i->second.markused(); 356 387 357 388 } // if( i == linkMapping.end() ) … … 397 428 if( i->second.interface != NULL ) 398 429 i->second.interface->onLinkChanged( id, nodeId, i->second.node ); 430 431 i->second.markused(); 399 432 } 400 433 … … 412 445 if( i->second.interface != NULL ) 413 446 i->second.interface->onLinkFail( id, nodeId, i->second.node ); 447 448 i->second.markused(); 414 449 } 415 450 … … 427 462 if( i->second.interface != NULL ) 428 463 i->second.interface->onLinkQoSChanged( id, nodeId, i->second.node, qos ); 464 465 i->second.markused(); 429 466 } 430 467 … … 434 471 OverlayMsg* overlayMsg = ((Message*)message)->decapsulate<OverlayMsg>(); 435 472 if( overlayMsg == NULL ) return false; 473 474 // mark the link as in action 475 LinkMapping::iterator item = linkMapping.find( link ); 476 if( item != linkMapping.end() ) item->second.markused(); 436 477 437 478 // … … 683 724 i->second.interface = iface; 684 725 iface->onLinkUp( link, nodeId, sourcenode ); 726 i->second.markused(); 685 727 686 728 return true ; … … 828 870 } 829 871 872 void BaseOverlay::eventFunction(){ 873 874 list<LinkID> oldlinks; 875 time_t now = time(NULL); 876 877 // first gather all the links from linkMapping that need droppin 878 // don't directly drop, as the dropLink function affects the 879 // linkMapping structure that we are traversing here. 880 // drop links after a timeout of 30s 881 882 // the macro gets confused if type is passed directly 883 // because of the comma in the pair definition 884 typedef pair<LinkID,LinkItem> pairitem; 885 886 BOOST_FOREACH( pairitem item, linkMapping ){ 887 if( item.second.autolink && difftime(now, item.second.lastuse) > 30) 888 oldlinks.push_back( item.first ); 889 } 890 891 BOOST_FOREACH( const LinkID lnk, oldlinks ){ 892 dropLink( lnk ); 893 } 894 } 895 830 896 }} // namespace ariba, overlay -
source/ariba/overlay/BaseOverlay.h
r2473 r2483 43 43 #include <iostream> 44 44 #include <algorithm> 45 #include <ctime> 45 46 #include <boost/foreach.hpp> 46 47 … … 50 51 #include "ariba/utility/misc/Demultiplexer.hpp" 51 52 #include "ariba/utility/logging/Logging.h" 53 #include "ariba/utility/system/Timer.h" 52 54 53 55 #include "ariba/communication/EndpointDescriptor.h" … … 67 69 using std::map; 68 70 using std::make_pair; 71 using std::pair; 69 72 70 73 using ariba::communication::EndpointDescriptor; … … 92 95 using ariba::utility::MessageSender; 93 96 using ariba::utility::seqnum_t; 97 using ariba::utility::Timer; 94 98 95 99 #define ovl OvlVis::instance() … … 109 113 public MessageReceiver, 110 114 public CommunicationEvents, 111 public OverlayStructureEvents { 115 public OverlayStructureEvents, 116 protected Timer { 112 117 113 118 use_logging_h( BaseOverlay ); … … 248 253 virtual void onNodeJoin( const NodeID& node ); 249 254 255 // for timer events 256 virtual void eventFunction(); 257 250 258 private: 251 259 … … 316 324 static const LinkItem UNSPECIFIED; 317 325 318 LinkItem( const LinkID& _link, const NodeID& _node, 326 LinkItem( const LinkID& _link, const NodeID& _node, 319 327 const ServiceID& _service, ServiceInterface* _interface ) 320 : link( _link ), node( _node ), service( _service ), interface( _interface ){ 328 : link( _link ), node( _node ), service( _service ), interface( _interface ), 329 autolink( false ), lastuse( time(NULL) ) { 321 330 } 331 332 // general information about the link 322 333 323 334 const LinkID link; … … 325 336 ServiceID service; 326 337 ServiceInterface* interface; 338 339 // information needed for auto links 340 341 void markused(){ 342 lastuse = time(NULL); 343 } 344 345 bool autolink; 346 time_t lastuse; 327 347 }; 328 348
Note:
See TracChangeset
for help on using the changeset viewer.