Changeset 3712
- Timestamp:
- May 26, 2009, 8:03:39 PM (16 years ago)
- Location:
- source/ariba
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/communication/BaseCommunication.cpp
r3699 r3712 217 217 } 218 218 219 // warn if this link has some queued messages attached220 if( descriptor.waitingmsg.size() > 0 ){221 logging_warn( "dropping link " << link.toString() <<222 " that has " << descriptor.waitingmsg.size() << " waiting messages" );223 }224 225 219 // create message to drop the link 226 220 logging_debug( "sending out link close request. for us, the link is closed now" ); … … 266 260 msg.encapsulate( const_cast<Message*>(message) ); 267 261 268 if( linkDesc.linkup ){ 269 270 // send message 271 transport->sendMessage( &msg ); 272 return ++currentSeqnum; 273 274 } else { 275 276 // queue message 277 logging_info( "link " << lid.toString() << " is not up yet, queueing message" ); 278 linkDesc.waitingmsg.push_back( new Message(msg) ); // TODO ooooo 279 280 return 0; 281 282 } // if( linkDesc.linkup ) 262 if( !linkDesc.linkup ){ 263 logging_error("cant send message on link " << lid.toString() << ", link not up"); 264 return -1; 265 } 266 267 // send message 268 transport->sendMessage( &msg ); 269 return ++currentSeqnum; 283 270 } 284 271 … … 466 453 } 467 454 468 if( linkDesc.waitingmsg.size() > 0 ){469 logging_info( "sending out queued messages on link " << linkDesc.localLink.toString() );470 471 BOOST_FOREACH( Message* msg, linkDesc.waitingmsg ){472 sendMessage( linkDesc.localLink, msg );473 delete msg;474 }475 476 linkDesc.waitingmsg.clear();477 }478 479 455 } // LINK_STATE_OPEN_REPLY 480 456 … … 503 479 i->onLinkDown( linkDesc.localLink, linkDesc.localLocator, linkDesc.remoteLocator ); 504 480 } 505 506 //507 // delete all pending messages for the link that has been closed508 //509 510 BOOST_FOREACH( Message* msg, linkDesc.waitingmsg ){511 delete msg;512 }513 514 linkDesc.waitingmsg.clear();515 481 516 482 // … … 579 545 if( (*i).localLink != localLink) continue; 580 546 581 BOOST_FOREACH( Message* msg, i->waitingmsg ){582 delete msg;583 }584 585 i->waitingmsg.clear();586 547 linkSet.erase( i ); 587 588 548 break; 589 549 } -
source/ariba/communication/BaseCommunication.h
r3690 r3712 46 46 #include <vector> 47 47 #include <iostream> 48 #include <deque>49 48 #include <algorithm> 50 49 #include <boost/foreach.hpp> … … 78 77 using std::map; 79 78 using std::vector; 80 using std::deque;81 79 using std::pair; 82 80 using std::make_pair; … … 256 254 remoteEndpoint(desc.remoteEndpoint), 257 255 linkup(desc.linkup) { 258 259 BOOST_FOREACH( Message* msg, desc.waitingmsg ){260 waitingmsg.push_back( msg );261 }262 256 } 263 257 … … 266 260 } 267 261 268 LinkID localLink;262 LinkID localLink; 269 263 const NetworkLocator* localLocator; 270 LinkID remoteLink;264 LinkID remoteLink; 271 265 const NetworkLocator* remoteLocator; 272 EndpointDescriptor remoteEndpoint; 273 274 bool linkup; 275 deque<Message*> waitingmsg; 266 EndpointDescriptor remoteEndpoint; 267 268 bool linkup; 276 269 }; 277 270 -
source/ariba/overlay/BaseOverlay.cpp
r3705 r3712 270 270 LinkMapping::iterator i = linkMapping.find( link ); 271 271 272 // find the link item to drop 272 273 if( i == linkMapping.end() ){ 273 274 logging_warn( "can't drop link, mapping unknown " << link.toString() ); … … 275 276 } 276 277 278 LinkItem item = i->second; 279 280 // delete all queued messages 281 if( item.waitingmsg.size() > 0 ){ 282 283 logging_warn( "dropping link " << link.toString() << 284 " that has " << item.waitingmsg.size() << " waiting messages" ); 285 286 item.deleteWaiting(); 287 } 288 289 // erase the mapping and drop the link 277 290 linkMapping.erase( i ); 278 279 LinkItem item = i->second;280 291 bc->dropLink( link ); 281 292 293 // tell sideports and listeners of the drop 282 294 item.interface->onLinkDown( link, item.node ); 283 295 sideport->onLinkDown(link, this->nodeId, item.node, this->spovnetId ); … … 287 299 288 300 logging_debug( "baseoverlay is sending data message on link " << link.toString() ); 301 302 // 303 // get the mapping for this link 304 // 289 305 290 306 LinkMapping::iterator i = linkMapping.find( link ); … … 294 310 } 295 311 296 OverlayMsg overmsg( 297 OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId ); 312 i->second.markused(); 313 314 // 315 // check if the link is up yet, if its an autlink queue message 316 // 317 318 if( !i->second.linkup ){ 319 320 if( i->second.autolink ){ 321 logging_info( "auto link " << link.toString() << " is not up yet, queueing message" ); 322 i->second.waitingmsg.push_back( new Message(*message) ); 323 } else { 324 logging_error("link " << link.toString() << " is not up yet, dropping message" ); 325 } 326 327 return -1; 328 } 329 330 // 331 // send the message through the basecomm 332 // 333 334 OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId ); 298 335 overmsg.encapsulate( const_cast<Message*>(message) ); 299 336 300 i->second.markused();301 337 return bc->sendMessage( link, &overmsg ); 302 338 } … … 308 344 LinkMapping::iterator i = linkMapping.begin(); 309 345 LinkMapping::iterator iend = linkMapping.end(); 346 347 // 348 // see if we find a link for this node and service destination 349 // 310 350 311 351 for( ; i != iend; i++ ){ … … 316 356 } 317 357 358 // 359 // if we found no link, create an auto link 360 // 361 318 362 if( link == LinkID::UNSPECIFIED ){ 319 363 … … 322 366 ". creating auto link ..."); 323 367 368 // call basecomm to create a link 324 369 link = establishLink( node, service ); 370 371 // this will call onlinkup on us, if everything worked we now have a mapping 325 372 LinkMapping::iterator i = linkMapping.find( link ); 373 i->second.autolink = true; 326 374 327 375 if( i == linkMapping.end() || link == LinkID::UNSPECIFIED ){ … … 330 378 return -1; 331 379 } 332 333 i->second.autolink = true;334 380 335 381 logging_debug( "establishing autolink in progress to node " … … 531 577 sideport->onLinkDown( id, this->nodeId, i->second.node, this->spovnetId ); 532 578 579 // delete all queued messages 580 if( i->second.waitingmsg.size() > 0 ){ 581 582 logging_warn( "dropping link " << id.toString() << 583 " that has " << i->second.waitingmsg.size() << " waiting messages" ); 584 585 i->second.deleteWaiting(); 586 } 587 533 588 linkMapping.erase( i ); 534 589 } … … 788 843 789 844 // update our link mapping information for this link 790 bool changed = 791 ( i->second.node != sourcenode ) || 792 ( i->second.service != service ); 845 bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service ); 793 846 i->second.node = sourcenode; 794 847 i->second.service = service; … … 818 871 819 872 // ask the service whether it wants to accept this link 820 if( iface->onLinkRequest(sourcenode) ){ 821 822 logging_debug("link " << link.toString() << 823 " has been accepted by service " << service.toString()); 824 825 // call the notification functions 826 iface->onLinkUp( link, sourcenode ); 827 sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId ); 828 829 } else { 873 if( !iface->onLinkRequest(sourcenode) ){ 830 874 831 875 logging_debug("link " << link.toString() << … … 836 880 // drop the link 837 881 dropLink( link ); 838 } 882 883 return true; 884 } 885 886 // 887 // link has been accepted, link is now up, send messages out first 888 // 889 890 i->second.linkup = true; 891 logging_debug("link " << link.toString() << 892 " has been accepted by service " << service.toString() << " and is now up"); 893 894 if( i->second.waitingmsg.size() > 0 ){ 895 logging_info( "sending out queued messages on link " << link.toString() ); 896 897 BOOST_FOREACH( Message* msg, i->second.waitingmsg ){ 898 sendMessage( msg, link ); 899 delete msg; 900 } 901 902 i->second.waitingmsg.clear(); 903 } 904 905 // call the notification functions 906 iface->onLinkUp( link, sourcenode ); 907 sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId ); 839 908 840 909 return true; -
source/ariba/overlay/BaseOverlay.h
r3690 r3712 46 46 #include <list> 47 47 #include <vector> 48 #include <deque> 48 49 #include <boost/foreach.hpp> 49 50 … … 80 81 using std::pair; 81 82 using std::find; 83 using std::deque; 82 84 83 85 using ariba::NodeListener; … … 420 422 421 423 LinkItem() : 422 link(LinkID::UNSPECIFIED), node(NodeID::UNSPECIFIED), service( 423 ServiceID::UNSPECIFIED), interface( 424 &CommunicationListener::DEFAULT), autolink(false), lastuse( 425 0) { 424 link(LinkID::UNSPECIFIED), 425 node(NodeID::UNSPECIFIED), 426 service(ServiceID::UNSPECIFIED), 427 interface(&CommunicationListener::DEFAULT), 428 autolink(false), 429 lastuse(0), 430 linkup(false){ 426 431 } 427 432 428 433 LinkItem(const LinkID& _link, const NodeID& _node, 429 434 const ServiceID& _service, CommunicationListener* _interface) : 430 link(_link), node(_node), service(_service), interface(_interface), 431 autolink(false), lastuse(time(NULL)) { 435 link(_link), 436 node(_node), 437 service(_service), 438 interface(_interface), 439 autolink(false), 440 lastuse(time(NULL)), 441 linkup(false){ 432 442 433 443 assert( _interface != NULL ); 434 444 } 435 445 446 LinkItem(const LinkItem& rh) : 447 link(rh.link), 448 node(rh.node), 449 service(rh.service), 450 interface(rh.interface), 451 autolink(rh.autolink), 452 lastuse(rh.lastuse), 453 linkup(rh.linkup){ 454 455 BOOST_FOREACH( Message* msg, rh.waitingmsg ){ 456 waitingmsg.push_back( msg ); 457 } 458 } 459 460 void deleteWaiting(){ 461 BOOST_FOREACH( Message* msg, waitingmsg ){ 462 delete msg; 463 } 464 waitingmsg.clear(); 465 } 466 436 467 // general information about the link 437 438 468 const LinkID link; 439 469 NodeID node; 440 470 ServiceID service; 441 471 CommunicationListener* interface; 472 bool linkup; 442 473 443 474 // information needed for auto links … … 448 479 bool autolink; 449 480 time_t lastuse; 481 deque<Message*> waitingmsg; 450 482 }; 451 483 … … 467 499 }; 468 500 469 } 470 } // namespace ariba, overlay 501 }} // namespace ariba, overlay 471 502 472 503 #endif /*BASEOVERLAY_H_*/ -
source/ariba/utility/system/StartupWrapper.cpp
r3690 r3712 108 108 // set up again an individual level if you like 109 109 { 110 // log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("BaseOverlay")); 111 // logger->setLevel(log4cxx::Level::getInfo()); 110 // log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("BaseOverlay")); 111 // logger->setLevel(log4cxx::Level::getDebug()); 112 } 113 { 114 // log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("BaseCommunication")); 115 // logger->setLevel(log4cxx::Level::getDebug()); 112 116 } 113 117
Note:
See TracChangeset
for help on using the changeset viewer.