Changeset 6822 for source/ariba/overlay/BaseOverlay.cpp
- Timestamp:
- Nov 4, 2009, 11:20:00 AM (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/overlay/BaseOverlay.cpp
r6797 r6822 56 56 57 57 #include "ariba/utility/visual/OvlVis.h" 58 #include "ariba/utility/visual/DddVis.h" 59 #include "ariba/utility/visual/ServerVis.h" 58 60 59 61 namespace ariba { 60 62 namespace overlay { 63 64 #define visual ariba::utility::DddVis::instance() 65 #define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY 66 #define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION 61 67 62 68 class ValueEntry { 63 69 public: 64 70 ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)), 65 71 last_change(time(NULL)), value(value.clone()) { 66 72 } 67 73 … … 122 128 vector<Data> vect; 123 129 BOOST_FOREACH( ValueEntry& e, values ) 124 130 vect.push_back( e.get_value() ); 125 131 return vect; 126 132 } … … 163 169 if (verbose) 164 170 std::cout << "DHT: Republished value. Refreshing value timestamp." 165 << std::endl;171 << std::endl; 166 172 return; 167 173 } … … 171 177 if (verbose) 172 178 std::cout << "DHT: Added value to " 173 << " key=" << key << " with value=" << value << std::endl;179 << " key=" << key << " with value=" << value << std::endl; 174 180 entry.values.push_back( ValueEntry( value ) ); 175 181 entry.values.back().set_ttl(ttl); … … 181 187 if (verbose) 182 188 std::cout << "DHT: New key value pair " 183 << " key=" << key << " with value=" << value << std::endl;189 << " key=" << key << " with value=" << value << std::endl; 184 190 185 191 // add new entry … … 274 280 LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) { 275 281 BOOST_FOREACH( LinkDescriptor* lp, links ) 276 if ((communication ? lp->communicationId : lp->overlayId) == link)277 return lp;282 if ((communication ? lp->communicationId : lp->overlayId) == link) 283 return lp; 278 284 return NULL; 279 285 } … … 281 287 const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const { 282 288 BOOST_FOREACH( const LinkDescriptor* lp, links ) 283 if ((communication ? lp->communicationId : lp->overlayId) == link)284 return lp;289 if ((communication ? lp->communicationId : lp->overlayId) == link) 290 return lp; 285 291 return NULL; 286 292 } … … 313 319 // search for a descriptor that is already up 314 320 BOOST_FOREACH( LinkDescriptor* lp, links ) 315 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0)316 return lp;321 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0) 322 return lp; 317 323 // if not found, search for one that is about to come up... 318 324 BOOST_FOREACH( LinkDescriptor* lp, links ) 319 320 325 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 ) 326 return lp; 321 327 return NULL; 322 328 } … … 328 334 if (!ld->up) continue; 329 335 OverlayMsg msg( OverlayMsg::typeLinkAlive, 330 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode );336 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 331 337 if (ld->relayed) msg.setRouteRecord(true); 332 338 send_link( &msg, ld->overlayId ); … … 416 422 bool found = false; 417 423 BOOST_FOREACH(NodeID& id, nodes) 418 424 if (id == ld->remoteNode) found = true; 419 425 if (found) continue; 420 426 i++; 421 427 nodes.push_back(ld->remoteNode); 422 428 if ((i%1) == 1) s << "<tr style=\"background-color=#f0f0f0;\">"; 423 429 else s << "<tr>"; 424 430 s << "<td>" << ld->overlayId.toString().substr(0,4) << "..</td>"; 425 431 s << "<td>" << ld->remoteNode.toString().substr(0,4) << "..</td>"; … … 455 461 LinkDescriptor* rhsld = getDescriptor(rhs); 456 462 if (lhsld==NULL || rhsld==NULL 457 || !lhsld->up || !rhsld->up458 || lhsld->remoteNode != rhsld->remoteNode) return -1;463 || !lhsld->up || !rhsld->up 464 || lhsld->remoteNode != rhsld->remoteNode) return -1; 459 465 460 466 if ((lhsld->remoteLink^lhsld->overlayId)<(rhsld->remoteLink^lhsld->overlayId) ) … … 503 509 if (destination == nodeId) { 504 510 logging_warn("Sent message to myself. Handling message.") 505 Message msg;511 Message msg; 506 512 msg.encapsulate(message); 507 513 handleMessage( &msg, NULL ); … … 518 524 logging_warn("Could not send message. No relay hop found to " 519 525 << destination) 520 return -1;526 return -1; 521 527 } 522 528 } … … 528 534 if (next_id.isUnspecified()) { 529 535 logging_warn("Could not send message. No next hop found to " << 530 destination );536 destination ); 531 537 return -1; 532 538 } … … 568 574 if (ldr->relayed) { 569 575 logging_debug("Resolving direct link for relayed link to " 570 << ldr->remoteNode);576 << ldr->remoteNode); 571 577 ld = getRelayLinkTo( ldr->remoteNode ); 572 578 if (ld==NULL) { … … 592 598 593 599 seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote, 594 const ServiceID& service) {600 const ServiceID& service) { 595 601 message->setSourceNode(nodeId); 596 602 message->setDestinationNode(remote); … … 627 633 // relay link still used and alive? 628 634 if (ld==NULL 629 || !ld->isDirectVital()630 || difftime(route.used, time(NULL)) > 8) {635 || !ld->isDirectVital() 636 || difftime(route.used, time(NULL)) > 8) { 631 637 logging_info("Forgetting relay information to node " 632 638 << route.node.toString() ); … … 658 664 // handle relayed messages from real links only 659 665 if (ld == NULL 660 || ld->relayed661 || message->getSourceNode()==nodeId ) return;666 || ld->relayed 667 || message->getSourceNode()==nodeId ) return; 662 668 663 669 // update usage information … … 691 697 // route has a shorter hop count or old link is dead? yes-> replace 692 698 if (route.hops > message->getNumHops() 693 || rld == NULL694 || !rld->isDirectVital()) {699 || rld == NULL 700 || !rld->isDirectVital()) { 695 701 logging_info("Updating relay information to node " 696 << route.node.toString()697 << " reducing to " << message->getNumHops() << " hops.");702 << route.node.toString() 703 << " reducing to " << message->getNumHops() << " hops."); 698 704 route.hops = message->getNumHops(); 699 705 route.link = ld->overlayId; … … 710 716 route.used = time(NULL); 711 717 logging_info("Remembering relay information to node " 712 << route.node.toString());718 << route.node.toString()); 713 719 relay_routes.push_back(route); 714 720 } … … 739 745 740 746 BaseOverlay::BaseOverlay() : 741 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED),742 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid),743 sideport(&SideportListener::DEFAULT), started(false), counter(0) {747 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED), 748 spovnetId(SpoVNetID::UNSPECIFIED), state(BaseOverlayStateInvalid), 749 sideport(&SideportListener::DEFAULT), started(false), counter(0) { 744 750 dht = new DHT(); 745 751 localDHT = new DHT(); … … 818 824 state = BaseOverlayStateCompleted; 819 825 BOOST_FOREACH( NodeListener* i, nodeListeners ) 820 826 i->onJoinCompleted( spovnetId ); 821 827 822 828 //ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA ); … … 853 859 BOOST_FOREACH( LinkDescriptor* ld, links ) { 854 860 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID ) 855 servicelinks.push_back( ld->overlayId );861 servicelinks.push_back( ld->overlayId ); 856 862 } 857 863 858 864 // drop all service links 859 865 BOOST_FOREACH( LinkID lnk, servicelinks ) 860 866 dropLink( lnk ); 861 867 862 868 // let the node leave the spovnet overlay interface … … 867 873 // drop still open bootstrap links 868 874 BOOST_FOREACH( LinkID lnk, bootstrapLinks ) 869 875 bc->dropLink( lnk ); 870 876 871 877 // change to inalid state 872 878 state = BaseOverlayStateInvalid; 873 879 //ovl.visShutdown( ovlId, nodeId, string("") ); 880 881 visual.visShutdown(visualIdOverlay, nodeId, ""); 882 visual.visShutdown(visualIdBase, nodeId, ""); 874 883 875 884 // inform all registered services of the event … … 898 907 899 908 BOOST_FOREACH( NodeListener* i, nodeListeners ) 900 909 i->onJoinFailed( spovnetId ); 901 910 902 911 return; 903 912 } 913 914 visual.configure("127.0.0.1", 50005); 915 //visual.configure("141.3.161.8", 50005); 916 visual.visCreate(visualIdBase, nodeId, "", ""); 917 visual.visCreate(visualIdOverlay, nodeId, "", ""); 904 918 } 905 919 … … 907 921 908 922 const LinkID BaseOverlay::establishLink( const EndpointDescriptor& remoteEp, 909 const NodeID& remoteId, const ServiceID& service ) {923 const NodeID& remoteId, const ServiceID& service ) { 910 924 911 925 // establish link via overlay … … 914 928 else 915 929 916 // establish link directly if only ep is known917 if (remoteId.isUnspecified())918 return establishDirectLink(remoteEp, service );930 // establish link directly if only ep is known 931 if (remoteId.isUnspecified()) 932 return establishDirectLink(remoteEp, service ); 919 933 920 934 } … … 922 936 /// call base communication's establish link and add link mapping 923 937 const LinkID BaseOverlay::establishDirectLink( const EndpointDescriptor& ep, 924 const ServiceID& service ) {938 const ServiceID& service ) { 925 939 926 940 /// find a service listener … … 941 955 /// establish link and add mapping 942 956 logging_info("Establishing direct link " << ld->communicationId.toString() 943 << " using " << ep.toString());957 << " using " << ep.toString()); 944 958 945 959 return ld->communicationId; … … 948 962 /// establishes a link between two arbitrary nodes 949 963 const LinkID BaseOverlay::establishLink( const NodeID& remote, 950 const ServiceID& service ) {964 const ServiceID& service ) { 951 965 952 966 // do not establish a link to myself! … … 967 981 // debug message 968 982 logging_info( 969 "Sending link request with"970 << " link=" << ld->overlayId.toString()971 << " node=" << ld->remoteNode.toString()972 << " serv=" << ld->service.toString()983 "Sending link request with" 984 << " link=" << ld->overlayId.toString() 985 << " node=" << ld->remoteNode.toString() 986 << " serv=" << ld->service.toString() 973 987 ); 974 988 … … 1023 1037 if( ld == NULL ) { 1024 1038 logging_error("Could not send message. " 1025 << "Link not found id=" << link.toString());1039 << "Link not found id=" << link.toString()); 1026 1040 return -1; 1027 1041 } … … 1050 1064 1051 1065 seqnum_t BaseOverlay::sendMessage(const Message* message, 1052 const NodeID& node, const ServiceID& service) {1066 const NodeID& node, const ServiceID& service) { 1053 1067 1054 1068 // find link for node and service … … 1060 1074 // debug output 1061 1075 logging_info( "No link to send message to node " 1062 << node.toString() << " found for service "1063 << service.toString() << ". Creating auto link ..."1076 << node.toString() << " found for service " 1077 << service.toString() << ". Creating auto link ..." 1064 1078 ); 1065 1079 … … 1183 1197 // already bound? yes-> warning 1184 1198 NodeListenerVector::iterator i = 1185 find( nodeListeners.begin(), nodeListeners.end(), listener );1199 find( nodeListeners.begin(), nodeListeners.end(), listener ); 1186 1200 if( i != nodeListeners.end() ) { 1187 1201 logging_warn("Node listener " << listener << " is already bound!" ); … … 1212 1226 1213 1227 void BaseOverlay::onLinkUp(const LinkID& id, 1214 const address_v* local, const address_v* remote) {1228 const address_v* local, const address_v* remote) { 1215 1229 logging_debug( "Link up with base communication link id=" << id ); 1216 1230 … … 1221 1235 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){ 1222 1236 logging_info( 1223 "Join has been initiated by me and the link is now up. " <<1224 "Sending out join request for SpoVNet " << spovnetId.toString()1237 "Join has been initiated by me and the link is now up. " << 1238 "Sending out join request for SpoVNet " << spovnetId.toString() 1225 1239 ); 1226 1240 1227 1241 // send join request message 1228 1242 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, 1229 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );1243 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1230 1244 JoinRequest joinRequest( spovnetId, nodeId ); 1231 1245 overlayMsg.encapsulate( &joinRequest ); … … 1249 1263 // -> wait for update message! 1250 1264 1251 // link mapping found? -> send update message with node-id and service id1265 // link mapping found? -> send update message with node-id and service id 1252 1266 } else { 1253 1267 logging_info( "onLinkUp descriptor (initiated locally):" << ld ); … … 1271 1285 // note: necessary to validate the link on the remote side! 1272 1286 logging_info( "Sending out update" << 1273 " for service " << ld->service.toString() <<1274 " with local node id " << nodeId.toString() <<1275 " on link " << ld->overlayId.toString() );1287 " for service " << ld->service.toString() << 1288 " with local node id " << nodeId.toString() << 1289 " on link " << ld->overlayId.toString() ); 1276 1290 1277 1291 // compile and send update message … … 1285 1299 1286 1300 void BaseOverlay::onLinkDown(const LinkID& id, 1287 const address_v* local, const address_v* remote) {1301 const address_v* local, const address_v* remote) { 1288 1302 1289 1303 // erase bootstrap links … … 1318 1332 1319 1333 void BaseOverlay::onLinkChanged(const LinkID& id, 1320 const address_v* oldlocal, const address_v* newlocal,1321 const address_v* oldremote, const address_v* newremote) {1334 const address_v* oldlocal, const address_v* newlocal, 1335 const address_v* oldremote, const address_v* newremote) { 1322 1336 1323 1337 // get descriptor for link … … 1335 1349 1336 1350 void BaseOverlay::onLinkFail(const LinkID& id, 1337 const address_v* local, const address_v* remote) {1351 const address_v* local, const address_v* remote) { 1338 1352 logging_debug( "Link fail with base communication link id=" << id ); 1339 1353 … … 1353 1367 1354 1368 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local, 1355 const address_v* remote, const QoSParameterSet& qos) {1369 const address_v* remote, const QoSParameterSet& qos) { 1356 1370 logging_debug( "Link quality changed with base communication link id=" << id ); 1357 1371 … … 1363 1377 1364 1378 bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local, 1365 const address_v* remote ) {1379 const address_v* remote ) { 1366 1380 logging_debug("Accepting link request from " << remote->to_string() ); 1367 1381 return true; … … 1389 1403 if( joinReq->getSpoVNetID() != spovnetId ) { 1390 1404 logging_error( 1391 "Received join request for spovnet we don't handle " <<1392 joinReq->getSpoVNetID().toString() );1405 "Received join request for spovnet we don't handle " << 1406 joinReq->getSpoVNetID().toString() ); 1393 1407 return false; 1394 1408 } … … 1405 1419 assert( overlayInterface != NULL ); 1406 1420 logging_debug( "Using bootstrap end-point " 1407 << getEndpointDescriptor().toString() )1421 << getEndpointDescriptor().toString() ) 1408 1422 OverlayParameterSet parameters = overlayInterface->getParameters(); 1409 1423 OverlayMsg retmsg( OverlayMsg::typeJoinReply, 1410 OverlayInterface::OVERLAY_SERVICE_ID, nodeId );1424 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1411 1425 JoinReply replyMsg( spovnetId, parameters, 1412 1426 allow, getEndpointDescriptor() ); … … 1448 1462 // inform all registered services of the event 1449 1463 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1450 1464 i->onJoinFailed( spovnetId ); 1451 1465 1452 1466 delete replyMsg; … … 1459 1473 1460 1474 logging_debug( "Using bootstrap end-point " 1461 << replyMsg->getBootstrapEndpoint().toString() );1475 << replyMsg->getBootstrapEndpoint().toString() ); 1462 1476 1463 1477 // create overlay structure from spovnet parameter set … … 1468 1482 1469 1483 overlayInterface = OverlayFactory::create( 1470 *this, replyMsg->getParam(), nodeId, this );1484 *this, replyMsg->getParam(), nodeId, this ); 1471 1485 1472 1486 // overlay structure supported? no-> fail! … … 1503 1517 // inform all registered services of the event 1504 1518 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1505 1519 i->onJoinCompleted( spovnetId ); 1506 1520 1507 1521 delete replyMsg; … … 1526 1540 const ServiceID& service = overlayMsg->getService(); 1527 1541 logging_debug( "Received data for service " << service.toString() 1528 << " on link " << overlayMsg->getDestinationLink().toString() );1542 << " on link " << overlayMsg->getDestinationLink().toString() ); 1529 1543 1530 1544 // delegate data message 1531 1545 getListener(service)->onMessage( 1532 overlayMsg,1533 overlayMsg->getSourceNode(),1534 overlayMsg->getDestinationLink()1546 overlayMsg, 1547 overlayMsg->getSourceNode(), 1548 overlayMsg->getDestinationLink() 1535 1549 ); 1536 1550 … … 1550 1564 // update our link mapping information for this link 1551 1565 bool changed = 1552 ( ld->remoteNode != overlayMsg->getSourceNode() )1553 || ( ld->service != overlayMsg->getService() );1566 ( ld->remoteNode != overlayMsg->getSourceNode() ) 1567 || ( ld->service != overlayMsg->getService() ); 1554 1568 1555 1569 // set parameters … … 1591 1605 1592 1606 logging_debug("Link id=" << ld->overlayId.toString() << 1593 " has been denied by service " << ld->service.toString() << ", dropping link");1607 " has been denied by service " << ld->service.toString() << ", dropping link"); 1594 1608 1595 1609 // prevent onLinkDown calls to the service … … 1668 1682 if (ldn == NULL) { 1669 1683 logging_error( "No link request pending for " 1670 << overlayMsg->getDestinationLink().toString() );1684 << overlayMsg->getDestinationLink().toString() ); 1671 1685 return false; 1672 1686 } … … 1681 1695 // debug message 1682 1696 logging_debug( "Link request reply received. Establishing link" 1683 << " for service " << overlayMsg->getService().toString()1684 << " with local id=" << overlayMsg->getDestinationLink()1685 << " and remote link id=" << overlayMsg->getSourceLink()1686 << " to " << overlayMsg->getSourceEndpoint().toString()1697 << " for service " << overlayMsg->getService().toString() 1698 << " with local id=" << overlayMsg->getDestinationLink() 1699 << " and remote link id=" << overlayMsg->getSourceLink() 1700 << " to " << overlayMsg->getSourceEndpoint().toString() 1687 1701 ); 1688 1702 … … 1702 1716 if( ldn->messageQueue.size() > 0 ) { 1703 1717 logging_info( "Sending out queued messages on link " << 1704 ldn->overlayId.toString() );1718 ldn->overlayId.toString() ); 1705 1719 BOOST_FOREACH( Message* msg, ldn->messageQueue ) { 1706 1720 sendMessage( msg, ldn->overlayId ); … … 1715 1729 // try to replace relay link with direct link 1716 1730 ldn->communicationId = 1717 bc->establishLink( overlayMsg->getSourceEndpoint() );1731 bc->establishLink( overlayMsg->getSourceEndpoint() ); 1718 1732 1719 1733 return true; … … 1725 1739 if ( rld != NULL ) { 1726 1740 logging_debug("Keep-Alive for " << 1727 overlayMsg->getDestinationLink() );1741 overlayMsg->getDestinationLink() ); 1728 1742 if (overlayMsg->isRouteRecord()) 1729 1743 rld->routeRecord = overlayMsg->getRouteRecord(); … … 1765 1779 /// handles an incoming message 1766 1780 bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld, 1767 const LinkID bcLink ) {1781 const LinkID bcLink ) { 1768 1782 logging_debug( "Handling message: " << message->toString()); 1769 1783 1770 1784 // decapsulate overlay message 1771 1785 OverlayMsg* overlayMsg = 1772 const_cast<Message*>(message)->decapsulate<OverlayMsg>();1786 const_cast<Message*>(message)->decapsulate<OverlayMsg>(); 1773 1787 if( overlayMsg == NULL ) return false; 1774 1788 … … 1788 1802 // handle signaling messages (do not route!) 1789 1803 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart && 1790 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) {1804 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) { 1791 1805 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); 1792 1806 delete overlayMsg; … … 1796 1810 // message for reached destination? no-> route message 1797 1811 if (!overlayMsg->getDestinationNode().isUnspecified() && 1798 1812 overlayMsg->getDestinationNode() != nodeId ) { 1799 1813 logging_debug("Routing message " 1800 << " from " << overlayMsg->getSourceNode()1801 << " to " << overlayMsg->getDestinationNode()1814 << " from " << overlayMsg->getSourceNode() 1815 << " to " << overlayMsg->getDestinationNode() 1802 1816 ); 1803 1817 route( overlayMsg ); … … 1818 1832 switch ( overlayMsg->getType() ) { 1819 1833 1820 1821 1822 1834 // data transport messages 1835 case OverlayMsg::typeData: 1836 ret = handleData(overlayMsg, ld); break; 1823 1837 1824 1838 // overlay setup messages 1825 1826 1827 1828 1839 case OverlayMsg::typeJoinRequest: 1840 ret = handleJoinRequest(overlayMsg, bcLink ); break; 1841 case OverlayMsg::typeJoinReply: 1842 ret = handleJoinReply(overlayMsg, bcLink ); break; 1829 1843 1830 1844 // link specific messages 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1845 case OverlayMsg::typeLinkRequest: 1846 ret = handleLinkRequest(overlayMsg, ld ); break; 1847 case OverlayMsg::typeLinkReply: 1848 ret = handleLinkReply(overlayMsg, ld ); break; 1849 case OverlayMsg::typeLinkUpdate: 1850 ret = handleLinkUpdate(overlayMsg, ld ); break; 1851 case OverlayMsg::typeLinkAlive: 1852 ret = handleLinkAlive(overlayMsg, ld ); break; 1853 case OverlayMsg::typeLinkDirect: 1854 ret = handleLinkDirect(overlayMsg, ld ); break; 1841 1855 1842 1856 // handle unknown message type 1843 1844 1857 default: { 1858 logging_error( "received message in invalid state! don't know " << 1845 1859 "what to do with this message of type " << overlayMsg->getType() ); 1846 1847 1848 1860 ret = false; 1861 break; 1862 } 1849 1863 } 1850 1864 … … 1910 1924 stabilizeLinks(); 1911 1925 stabilizeDHT(); 1912 } 1913 1926 updateVisual(); 1927 } 1928 1929 void BaseOverlay::updateVisual(){ 1930 1931 // 1932 // update base overlay structure 1933 // 1934 1935 static NodeID pre = NodeID::UNSPECIFIED; 1936 static NodeID suc = NodeID::UNSPECIFIED; 1937 1938 vector<NodeID> nodes = this->getOverlayNeighbors(false); 1939 1940 if(nodes.size() == 0){ 1941 1942 if(pre != NodeID::UNSPECIFIED){ 1943 visual.visDisconnect(visualIdOverlay, this->nodeId, pre, ""); 1944 pre = NodeID::UNSPECIFIED; 1945 } 1946 if(suc != NodeID::UNSPECIFIED){ 1947 visual.visDisconnect(visualIdOverlay, this->nodeId, suc, ""); 1948 suc = NodeID::UNSPECIFIED; 1949 } 1950 1951 } // if(nodes.size() == 0) 1952 1953 if(nodes.size() == 1){ 1954 // only one node, make this pre and succ 1955 // and then go into the node.size()==2 case 1956 //nodes.push_back(nodes.at(0)); 1957 1958 if(pre != nodes.at(0)){ 1959 pre = nodes.at(0); 1960 if(pre != NodeID::UNSPECIFIED) 1961 visual.visConnect(visualIdOverlay, this->nodeId, pre, ""); 1962 } 1963 } 1964 1965 if(nodes.size() == 2){ 1966 1967 // old finger 1968 if(nodes.at(0) != pre){ 1969 if(pre != NodeID::UNSPECIFIED) 1970 visual.visDisconnect(visualIdOverlay, this->nodeId, pre, ""); 1971 pre = NodeID::UNSPECIFIED; 1972 } 1973 if(nodes.at(1) != suc){ 1974 if(suc != NodeID::UNSPECIFIED) 1975 visual.visDisconnect(visualIdOverlay, this->nodeId, suc, ""); 1976 suc = NodeID::UNSPECIFIED; 1977 } 1978 1979 // connect with fingers 1980 if(pre == NodeID::UNSPECIFIED){ 1981 pre = nodes.at(0); 1982 if(pre != NodeID::UNSPECIFIED) 1983 visual.visConnect(visualIdOverlay, this->nodeId, pre, ""); 1984 } 1985 if(suc == NodeID::UNSPECIFIED){ 1986 suc = nodes.at(1); 1987 if(suc != NodeID::UNSPECIFIED) 1988 visual.visConnect(visualIdOverlay, this->nodeId, suc, ""); 1989 } 1990 1991 } //if(nodes.size() == 2) 1992 1993 // { 1994 // logging_error("================================"); 1995 // logging_error("my nodeid " << nodeId.get(MAX_KEYLENGTH-16, 16)); 1996 // logging_error("================================"); 1997 // if(nodes.size()>= 1){ 1998 // logging_error("real pre " << nodes.at(0).toString()); 1999 // logging_error("real pre " << nodes.at(0).get(MAX_KEYLENGTH-16, 16)); 2000 // } 2001 // if(nodes.size()>= 2){ 2002 // logging_error("real suc " << nodes.at(1).toString()); 2003 // logging_error("real suc " << nodes.at(1).get(MAX_KEYLENGTH-16, 16)); 2004 // } 2005 // logging_error("================================"); 2006 // if(pre == NodeID::UNSPECIFIED){ 2007 // logging_error("pre: unspecified"); 2008 // }else{ 2009 // unsigned int prei = pre.get(MAX_KEYLENGTH-16, 16); 2010 // logging_error("pre: " << prei); 2011 // } 2012 // if(suc == NodeID::UNSPECIFIED){ 2013 // logging_error("suc: unspecified"); 2014 // }else{ 2015 // unsigned int suci = suc.get(MAX_KEYLENGTH-16, 16); 2016 // logging_error("suc: " << suci); 2017 // } 2018 // logging_error("================================"); 2019 // } 2020 2021 // 2022 // update base communication links 2023 // 2024 2025 static set<NodeID> linkset; 2026 set<NodeID> remotenodes; 2027 BOOST_FOREACH( LinkDescriptor* ld, links ) { 2028 if (!ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) 2029 continue; 2030 2031 if (ld->routeRecord.size()>1 && ld->relayed) { 2032 for (size_t i=1; i<ld->routeRecord.size(); i++) 2033 remotenodes.insert( ld->routeRecord[ld->routeRecord.size()-i-1] ); 2034 } else { 2035 remotenodes.insert(ld->remoteNode); 2036 } 2037 } 2038 2039 // which links are old and need deletion? 2040 BOOST_FOREACH(NodeID n, linkset){ 2041 if(remotenodes.find(n) == remotenodes.end()){ 2042 visual.visDisconnect(visualIdBase, this->nodeId, n, ""); 2043 linkset.erase(n); 2044 } 2045 } 2046 2047 // which links are new and need creation? 2048 BOOST_FOREACH(NodeID n, remotenodes){ 2049 if(linkset.find(n) == linkset.end()){ 2050 visual.visConnect(visualIdBase, this->nodeId, n, ""); 2051 linkset.insert(n); 2052 } 2053 } 2054 2055 } 1914 2056 1915 2057 // ---------------------------------------------------------------------------- … … 1954 2096 if (!overlayInterface->isClosestNodeTo(msg->getDestinationNode())) { 1955 2097 logging_debug("Routing DHT message to closest node " 1956 << " from " << msg->getSourceNode()1957 << " to " << msg->getDestinationNode()2098 << " from " << msg->getSourceNode() 2099 << " to " << msg->getDestinationNode() 1958 2100 ); 1959 2101 route( msg ); … … 1966 2108 case OverlayMsg::typeDHTPut: { 1967 2109 BOOST_FOREACH( Data value, dhtMsg->getValues() ) 1968 dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );2110 dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() ); 1969 2111 break; 1970 2112 } … … 1987 2129 if (dhtMsg->hasValues()) { 1988 2130 BOOST_FOREACH( Data value, dhtMsg->getValues() ) 1989 dht->remove(dhtMsg->getKey(), value );2131 dht->remove(dhtMsg->getKey(), value ); 1990 2132 } else 1991 2133 dht->remove( dhtMsg->getKey() );
Note:
See TracChangeset
for help on using the changeset viewer.