Changeset 5871 for source/ariba/overlay
- Timestamp:
- Aug 11, 2009, 4:13:09 PM (15 years ago)
- Location:
- source/ariba/overlay
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/overlay/BaseOverlay.cpp
r5870 r5871 137 137 } 138 138 139 <<<<<<< .working140 // get used next hop towards node141 LinkDescriptor* rld = NULL;142 NodeID relayNode = NodeID::UNSPECIFIED;143 LinkID rlid = overlayInterface->getNextLinkId(id);144 if ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) {145 // get descriptor of first hop146 rld = getDescriptor(rlid);147 =======148 139 // iterate over all links and check for time boundaries 149 140 vector<LinkDescriptor*> oldlinks; … … 153 144 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10) 154 145 ld->relaying = false; 155 >>>>>>> .merge-rechts.r5869 156 157 <<<<<<< .working 158 // is first hop a relay path use local relay 159 if ( rld->relay ) relayNode = rld->localRelay; 160 161 // no-> a proper relay node has been found 162 else relayNode = rld->remoteNode; 163 } 164 ======= 146 165 147 // keep alives and not up? yes-> link connection request is stale! 166 148 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) { 167 >>>>>>> .merge-rechts.r5869168 149 169 150 // increase counter … … 181 162 if (!ld->up) continue; 182 163 183 <<<<<<< .working184 // get descriptor185 BOOST_FOREACH( LinkDescriptor* lp, links )186 if (lp->remoteNode == relayNode &&187 lp->service == OverlayInterface::OVERLAY_SERVICE_ID &&188 lp->relay == false &&189 lp->up)190 return lp;191 =======192 164 // drop links that are dropped and not used as relay 193 165 if (ld->dropAfterRelaying && !ld->relaying && !ld->autolink) { … … 200 172 oldlinks.push_back( ld ); 201 173 continue; 202 >>>>>>> .merge-rechts.r5869 174 } 203 175 204 176 // keep alives missed? yes-> … … 536 508 } 537 509 510 538 511 //ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." ); 539 512 logging_info( "Starting to join spovnet " << id.toString() << … … 543 516 544 517 // bootstrap against ourselfs 545 logging_ info("joining spovnet locally");518 logging_debug("joining spovnet locally"); 546 519 547 520 overlayInterface->joinOverlay(); … … 560 533 561 534 // bootstrap against another node 562 logging_ info("joining spovnet remotely against " << bootstrapEp.toString());535 logging_debug("joining spovnet remotely against " << bootstrapEp.toString()); 563 536 564 537 const LinkID& lnk = bc->establishLink( bootstrapEp ); 565 538 bootstrapLinks.push_back(lnk); 566 567 539 logging_info("join process initiated for " << id.toString() << "..."); 568 569 540 } 570 541 } … … 1227 1198 delete replyMsg; 1228 1199 1229 <<<<<<< .working1230 // inform all registered services of the event1231 BOOST_FOREACH( NodeListener* i, nodeListeners )1232 i->onJoinFailed( spovnetId );1233 =======1234 1200 } else { 1235 >>>>>>> .merge-rechts.r58691236 1201 1237 1202 // this is not the first bootstrap, just join the additional node … … 1241 1206 1242 1207 delete replyMsg; 1208 1243 1209 } // if( overlayInterface == NULL ) 1244 1210 … … 1247 1213 1248 1214 1249 <<<<<<< .working1250 =======1251 1215 bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1252 1216 // get service … … 1255 1219 << " on link " << overlayMsg->getDestinationLink().toString() ); 1256 1220 1257 >>>>>>> .merge-rechts.r58691258 1221 // delegate data message 1259 1222 getListener(service)->onMessage( … … 1266 1229 } 1267 1230 1268 <<<<<<< .working 1269 ======= 1270 1271 >>>>>>> .merge-rechts.r5869 1231 1272 1232 bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1273 1233 1274 <<<<<<< .working1275 //record bootstrap ep as good endpoint to join1276 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() );1277 1278 delete replyMsg;1279 ret = true;1280 break;1281 }1282 =======1283 1234 if( ld == NULL ) { 1284 1235 logging_warn( "received overlay update message for link for " … … 1287 1238 } 1288 1239 logging_info("Received type update message on link " << ld ); 1289 >>>>>>> .merge-rechts.r58691290 1240 1291 1241 // update our link mapping information for this link … … 1310 1260 } 1311 1261 1312 <<<<<<< .working1313 // delegate data message1314 listener->onMessage( overlayMsg,1315 overlayMsg->getSourceNode(), ld->overlayId );1316 =======1317 1262 // service registered? no-> error! 1318 1263 if( !communicationListeners.contains( ld->service ) ) { … … 1320 1265 return false; 1321 1266 } 1322 >>>>>>> .merge-rechts.r58691323 1267 1324 1268 // default or no service registered? … … 1467 1411 } 1468 1412 1469 <<<<<<< .working1470 =======1471 1413 /// handle a keep-alive message for a link 1472 1414 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { … … 1490 1432 logging_debug( "Received direct link replacement request" ); 1491 1433 1492 >>>>>>> .merge-rechts.r58691493 1434 /// get destination overlay link 1494 1435 LinkDescriptor* rld = getDescriptor( overlayMsg->getDestinationLink() ); … … 1505 1446 rld->relayed = false; 1506 1447 1507 <<<<<<< .working1508 // mark incoming link as relay1509 if (ld!=NULL) ld->markAsRelay();1510 1511 // am I the destination of this message? yes->1512 if (relayMsg->getDestNode() == nodeId ) {1513 // deliver relay message locally!1514 logging_debug("Relay message reached destination. Handling the message.");1515 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );1516 ret = true;1517 break;1518 }1519 1520 // create route message1521 OverlayMsg _overMsg( *overlayMsg );1522 RelayMessage _relayMsg( *relayMsg );1523 _relayMsg.setType( RelayMessage::typeRoute );1524 _overMsg.encapsulate( &_relayMsg );1525 =======1526 1448 // mark used and alive! 1527 1449 rld->setAlive(); 1528 1450 rld->setAutoUsed(); 1529 >>>>>>> .merge-rechts.r58691530 1451 1531 1452 // erase the original descriptor … … 1538 1459 logging_debug( "Handling message: " << message->toString()); 1539 1460 1540 <<<<<<< .working1541 // mark incoming link as relay1542 if (ld!=NULL) ld->markAsRelay();1543 1544 // am I the destination of this message? yes->1545 if (relayMsg->getDestNode() == nodeId ) {1546 // deliver relay message locally!1547 logging_debug("Relay message reached destination. Handling the message.");1548 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode );1549 ret = true;1550 break;1551 }1552 1553 // am I the relay for this message? yes->1554 if (relayMsg->getRelayNode() == nodeId ) {1555 logging_debug("I'm the relay for this message. Sending to destination.");1556 OverlayMsg _overMsg( *overlayMsg );1557 RelayMessage _relayMsg( *relayMsg );1558 _overMsg.encapsulate(&_relayMsg);1559 =======1560 1461 // decapsulate overlay message 1561 1462 OverlayMsg* overlayMsg = 1562 1463 const_cast<Message*>(message)->decapsulate<OverlayMsg>(); 1563 1464 if( overlayMsg == NULL ) return false; 1564 >>>>>>> .merge-rechts.r58691565 1465 1566 1466 // refresh relay information … … 1651 1551 /// return the overlay neighbors 1652 1552 vector<NodeID> BaseOverlay::getOverlayNeighbors(bool deep) const { 1653 1553 // the known nodes _can_ also include our node, so we remove ourself 1654 1554 vector<NodeID> nodes = overlayInterface->getKnownNodes(deep); 1655 // the known nodes _can_ also include our node, so we remove ourself1656 1555 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId ); 1657 1556 if( i != nodes.end() ) nodes.erase( i ); … … 1688 1587 1689 1588 void BaseOverlay::eventFunction() { 1690 <<<<<<< .working1691 1692 // send keep-alive messages over established links1693 BOOST_FOREACH( LinkDescriptor* ld, links ) {1694 if (!ld->up) continue;1695 OverlayMsg overMsg( OverlayMsg::typeKeepAlive,1696 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );1697 sendMessage( &overMsg, ld );1698 }1699 1700 // iterate over all links and check for time boundaries1701 vector<LinkDescriptor*> oldlinks;1702 time_t now = time(NULL);1703 BOOST_FOREACH( LinkDescriptor* ld, links ) {1704 // remote used as relay flag1705 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10)1706 ld->usedAsRelay = false;1707 1708 // keep alives and not up? yes-> link connection request is stale!1709 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) {1710 1711 // increase counter1712 ld->keepAliveMissed++;1713 1714 // missed more than four keep-alive messages (10 sec)? -> drop link1715 if (ld->keepAliveMissed > 4) {1716 logging_info( "Link connection request is stale, closing: " << ld );1717 oldlinks.push_back( ld );1718 continue;1719 }1720 }1721 1722 if (!ld->up) continue;1723 1724 // drop links that are dropped and not used as relay1725 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink) {1726 oldlinks.push_back( ld );1727 continue;1728 }1729 1730 // auto-link time exceeded?1731 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) {1732 oldlinks.push_back( ld );1733 continue;1734 }1735 1736 // keep alives missed? yes->1737 if ( difftime( now, ld->keepAliveTime ) > 2 ) {1738 1739 // increase counter1740 ld->keepAliveMissed++;1741 1742 // missed more than four keep-alive messages (4 sec)? -> drop link1743 if (ld->keepAliveMissed >= 8) {1744 logging_info( "Link is stale, closing: " << ld );1745 oldlinks.push_back( ld );1746 continue;1747 }1748 }1749 }1750 1751 // drop links1752 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) {1753 /*1754 vector<LinkID>::iterator it = std::find(1755 bootstrapLinks.begin(), bootstrapLinks.end(), ld->communicationId);1756 1757 if (!ld->communicationId.isUnspecified() && it != bootstrapLinks.end() ){1758 logging_info( "Not dropping initiator link: " << ld );1759 continue;1760 }1761 */1762 logging_info( "Link timed out. Dropping " << ld );1763 dropLink( ld->overlayId );1764 }1765 1766 // show link state1767 counter++;1768 if (counter>=4) showLinkState();1769 if (counter>=4 || counter<0) counter = 0;1770 =======1771 1589 stabilizeRelays(); 1772 1590 stabilizeLinks(); 1773 >>>>>>> .merge-rechts.r58691774 1591 } 1775 1592 -
source/ariba/overlay/modules/chord/Chord.cpp
r5870 r5871 62 62 63 63 // create routing table 64 <<<<<<< .working65 this->table = new chord_routing_table(_nodeid, 2);66 =======67 64 this->table = new chord_routing_table(_nodeid, 4); 68 >>>>>>> .merge-rechts.r586969 65 orphan_removal_counter = 0; 70 66 discovery_count = 0; … … 101 97 102 98 // establish link via base overlay 103 <<<<<<< .working104 return baseoverlay.establishLink(endp, node, OverlayInterface::OVERLAY_SERVICE_ID, remoteRelay );105 =======106 99 return baseoverlay.establishLink( endpoint, remote, 107 100 OverlayInterface::OVERLAY_SERVICE_ID ); 108 >>>>>>> .merge-rechts.r5869109 101 } 110 102 … … 127 119 msg.encapsulate(&dmsg); 128 120 129 <<<<<<< .working130 // get next hop131 const route_item* item = table->get_next_hop(destination);132 if (item!=NULL && !item->info.isUnspecified()) send(&cmsg, item->info);133 =======134 121 // send to node 135 122 baseoverlay.send_node( &msg, remote ); 136 >>>>>>> .merge-rechts.r5869137 123 } 138 124 … … 200 186 const LinkID& Chord::getNextLinkId( const NodeID& id ) const { 201 187 // get next hop 202 <<<<<<< .working203 const route_item* item = table->get_next_hop(destnode);204 =======205 188 const route_item* item = table->get_next_hop(id); 206 >>>>>>> .merge-rechts.r5869 207 208 <<<<<<< .working 209 // message for this node? yes-> delegate to base overlay 210 if (item->id == nodeid || destnode == nodeid) 211 baseoverlay.incomingRouteMessage( msg, LinkID::UNSPECIFIED, nodeid ); 212 ======= 189 213 190 // returns a unspecified id when this is itself 214 191 if (item == NULL || item->id == nodeid) 215 192 return LinkID::UNSPECIFIED; 216 >>>>>>> .merge-rechts.r5869 217 218 <<<<<<< .working 219 else { // no-> send to next hop 220 ChordMessage cmsg(ChordMessage::route, nodeid, destnode); 221 cmsg.encapsulate(msg); 222 send(&cmsg, item->info); 223 } 224 ======= 193 225 194 /// return routing info 226 195 return item->info; 227 >>>>>>> .merge-rechts.r5869 228 } 229 230 <<<<<<< .working 231 /// @see OverlayInterface.h 232 void Chord::routeMessage(const NodeID& node, const LinkID& link, Message* msg) { 233 logging_debug("Redirect over Chord to node id=" << node.toString() 234 << " link id=" << link.toString() ); 235 ChordMessage cmsg(ChordMessage::route, nodeid, node); 236 cmsg.encapsulate(msg); 237 send(&cmsg, link); 238 } 239 240 /// @see OverlayInterface.h 241 const LinkID& Chord::getNextLinkId( const NodeID& id ) const { 242 // get next hop 243 const route_item* item = table->get_next_hop(id); 244 245 // returns a unspecified id when this is itself 246 if (item == NULL || item->id == nodeid) 247 return LinkID::UNSPECIFIED; 248 249 /// return routing info 250 return item->info; 251 } 252 253 ======= 254 >>>>>>> .merge-rechts.r5869 196 } 197 255 198 OverlayInterface::NodeList Chord::getKnownNodes(bool deep) const { 256 199 OverlayInterface::NodeList nodelist; … … 351 294 switch (m->getType()) { 352 295 353 <<<<<<< .working354 // invalid message355 case M::invalid:356 break;357 358 // route message with payload359 case M::route: {360 // find next hop361 const route_item* item = table->get_next_hop(m->getDestination());362 363 // next hop == myself?364 if (m->getDestination() == nodeid) { // yes-> route to base overlay365 logging_debug("Send message to baseoverlay");366 baseoverlay.incomingRouteMessage( m, item->info, remote );367 }368 // no-> route to next hop369 else {370 logging_debug("Route chord message to "371 << item->id.toString() << " (destination=" << m->getDestination() << ")");372 send(m, item->info);373 }374 break;375 }376 377 // discovery request378 case M::discovery: {379 =======380 296 // discovery request 381 297 case typeDiscovery: { 382 >>>>>>> .merge-rechts.r5869383 298 // decapsulate message 384 299 Discovery* dmsg = m->decapsulate<Discovery> (); … … 410 325 logging_debug("Discovery split: routing discovery message to successor " 411 326 << succ_item->id.toString() ); 412 <<<<<<< .working413 send(&cmsg_s, succ_item->info);414 =======415 327 send(&omsg, succ_item->info); 416 >>>>>>> .merge-rechts.r5869417 328 } 418 329 … … 426 337 logging_debug("Discovery split: routing discovery message to predecessor " 427 338 << pred_item->id.toString() ); 428 <<<<<<< .working429 send(&cmsg_p, pred_item->info);430 =======431 339 send( &omsg, pred_item->info); 432 >>>>>>> .merge-rechts.r5869433 340 } 434 341 } 435 342 // no-> route message 436 343 else { 437 <<<<<<< .working438 // find next hop439 const route_item* item = table->get_next_hop(m->getDestination());440 if (item == NULL || item->id == nodeid) break;441 logging_debug("routing discovery message to " <<442 item->id.toString() );443 send(m, item->info);444 =======445 344 baseoverlay.send( m, m->getDestinationNode() ); 446 >>>>>>> .merge-rechts.r5869447 345 } 448 346 break; … … 526 424 // remove orphan links 527 425 orphan_removal_counter++; 528 <<<<<<< .working529 if (orphan_removal_counter <0 || orphan_removal_counter >= 4) {530 =======531 426 if (orphan_removal_counter <0 || orphan_removal_counter >= 2) { 532 >>>>>>> .merge-rechts.r5869533 427 logging_info("Running orphan removal"); 534 428 orphan_removal_counter = 0;
Note:
See TracChangeset
for help on using the changeset viewer.