Changeset 12060
- Timestamp:
- Jun 19, 2013, 11:05:49 AM (11 years ago)
- Files:
-
- 18 added
- 11 deleted
- 82 edited
Legend:
- Unmodified
- Added
- Removed
-
/
- Property svn:ignore
-
old new 1 build* 2 .* 1 *.kdev4
-
- Property svn:mergeinfo changed (with no actual effect on merging)
- Property svn:ignore
-
CMakeLists.txt
r10752 r12060 43 43 set(ariba_VERSION_MAJOR 0) 44 44 set(ariba_VERSION_MINOR 8) 45 set(ariba_VERSION_PATCH 2)45 set(ariba_VERSION_PATCH 99) 46 46 set(ariba_VERSION ${ariba_VERSION_MAJOR}.${ariba_VERSION_MINOR}.${ariba_VERSION_PATCH}) 47 47 … … 51 51 list(APPEND CMAKE_MODULE_PATH "${ariba_SOURCE_DIR}/CMakeModules") 52 52 53 # Provide some choices for CMAKE_BUILD_TYPE 54 set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS 55 "" Release Debug RelWithDebInfo MinSizeRel) 53 54 ## Add build type: Profiling ## 55 SET( CMAKE_CXX_FLAGS_PROFILING "-O2 -g -pg" CACHE STRING 56 "Flags used by the C++ compiler during profiling builds." 57 FORCE ) 58 SET( CMAKE_C_FLAGS_PROFILING "-O2 -g -pg" CACHE STRING 59 "Flags used by the C compiler during profiling builds." 60 FORCE ) 61 SET( CMAKE_EXE_LINKER_FLAGS_PROFILING 62 "" CACHE STRING 63 "Flags used for linking binaries during profiling builds." 64 FORCE ) 65 SET( CMAKE_SHARED_LINKER_FLAGS_PROFILING 66 "" CACHE STRING 67 "Flags used by the shared libraries linker during profiling builds." 68 FORCE ) 69 MARK_AS_ADVANCED( 70 CMAKE_CXX_FLAGS_PROFILING 71 CMAKE_C_FLAGS_PROFILING 72 CMAKE_EXE_LINKER_FLAGS_PROFILING 73 CMAKE_SHARED_LINKER_FLAGS_PROFILING ) 74 # Update the documentation string of CMAKE_BUILD_TYPE for GUIs 75 #SET( CMAKE_BUILD_TYPE "${CMAKE_BUILD_TYPE}" CACHE STRING 76 # "Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel Profiling." 77 # FORCE ) 78 ## [Add build type: Profiling] ## 79 80 ## set default build type 81 IF(NOT CMAKE_BUILD_TYPE) 82 SET(CMAKE_BUILD_TYPE Release 83 CACHE STRING "Choose the type of build : None Debug Release RelWithDebInfo MinSizeRel Profiling." 84 FORCE) 85 ENDIF(NOT CMAKE_BUILD_TYPE) 86 message("---> Current build type is: ${CMAKE_BUILD_TYPE}") 87 88 89 90 91 ## Provide some choices for CMAKE_BUILD_TYPE 92 #set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS 93 # "" Release Debug RelWithDebInfo MinSizeRel) 56 94 57 95 # Explicitly add BUILD_SHARED_LIBS to the user interface and default to on -
etc/pingpong/settings_node1.cnf
r7532 r12060 1 node.name = nodeone2 ariba.endpoints = tcp{5003}3 ariba.bootstrap.hints=pingpong{ip{127.0.0.1};tcp{5002};broadcast}4 1 5 pingpong.name = nodeone 2 //JSON 3 { 4 "ariba": { 5 "spovnet_name": "pingpong", 6 7 "listen_on": [ 8 {"category": "TCPIP", "addr": "::", "port": 5003 } 9 ], 10 11 "bootstrap": { 12 "direct": [ 13 {"category": "TCPIP", "addr": "127.0.0.1", "port": 41322 } 14 ], 15 16 "broadcast": false, 17 "mdns": false, 18 "sdp": false 19 } 20 } 21 } 6 22 23 24 25 //node.name = nodeone 26 //ariba.endpoints = JSON { "endpoint_set": [ {"category": "TCPIP", "addr": "::", "port": 5003 } ] } 27 //ariba.bootstrap.hints=pingpong{broadcast;mdns;} 28 29 //pingpong.name = nodeone 30 -
etc/pingpong/settings_node2.cnf
r6179 r12060 1 node.name = nodetwo 2 ariba.endpoints = tcp{5004} 3 ariba.bootstrap.hints=pingpong{ip{127.0.0.1};tcp{5002}} 1 //JSON 2 { 3 "ariba": { 4 "spovnet_name": "pingpong", 5 6 "listen_on": [ 7 {"category": "TCPIP", "addr": "::", "port": 0 } 8 ], 9 10 "bootstrap": { 11 "direct": [ 12 {"category": "TCPIP", "addr": "127.0.0.1", "port": 5003 } 13 ], 14 15 "broadcast": false, 16 "mdns": false, 17 "sdp": false 18 } 19 } 20 } 4 21 5 pingpong.name = nodetwo -
pkg/ariba-config.cmake.in
r10700 r12060 42 42 43 43 # Import targets 44 include( ariba-targets.cmake)44 include("${CMAKE_CURRENT_LIST_DIR}/ariba-targets.cmake") 45 45 46 46 # Export include files -
sample/CMakeLists.txt
r10700 r12060 38 38 39 39 add_subdirectory(pingpong) 40 #add_subdirectory(debugging) 41 -
sample/pingpong/PingPong.cpp
r9990 r12060 3 3 #include "ariba/utility/visual/DddVis.h" 4 4 5 #include <boost/property_tree/ptree.hpp> 6 #include <boost/property_tree/json_parser.hpp> 7 8 5 9 using ariba::utility::Configuration; 6 10 using namespace ariba; … … 17 21 18 22 // construction 19 PingPong::PingPong() : pingId( 0 ) { 20 Timer::setInterval( 1000 ); 23 PingPong::PingPong( string config) : 24 node(), 25 pingId( 0 ), 26 config_file(config) 27 { 28 Timer::setInterval( 3000 ); 21 29 } 22 30 … … 26 34 27 35 // implementation of the startup interface 28 void PingPong::startup() { 29 36 void PingPong::startup() 37 { 38 using boost::property_tree::ptree; 39 using boost::property_tree::json_parser::read_json; 40 30 41 // set up logging 31 logging_rootlevel_ info();42 logging_rootlevel_debug(); 32 43 logging_classlevel_debug(PingPong); 33 44 34 logging_info( "starting up PingPong service ... " ); 35 36 // create ariba module 37 logging_debug( "creating ariba underlay module ... " ); 38 ariba = new AribaModule(); 39 40 Name spovnetName("pingpong"); 41 Name nodeName = Name::UNSPECIFIED; 42 this->name = string("<ping>"); 43 44 // get settings from configuration object 45 if( Configuration::haveConfig() ){ 46 Configuration& config = Configuration::instance(); 47 48 // get node name 49 if (config.exists("node.name")) 50 nodeName = config.read<string> ("node.name"); 51 52 // configure ariba module 53 if (config.exists("ariba.endpoints")) 54 ariba->setProperty("endpoints", config.read<string>("ariba.endpoints")); 55 if (config.exists("ariba.bootstrap.hints")) 56 ariba->setProperty("bootstrap.hints", config.read<string>("ariba.bootstrap.hints")); 57 if (config.exists("pingpong.name")) 58 name = config.read<string>("pingpong.name"); 59 60 // get visualization 61 if( config.exists("ariba.visual3d.ip") && config.exists("ariba.visual3d.port")){ 62 string ip = config.read<string>("ariba.visual3d.ip"); 63 unsigned int port = config.read<unsigned int>("ariba.visual3d.port"); 64 unsigned int color = config.exists("node.color") ? 65 config.read<unsigned int>("node.color") : 0; 66 ariba::utility::DddVis::instance().configure(ip, port, color); 67 } 68 69 } // if( Configuration::haveConfig() ) 70 71 // start ariba module 72 ariba->start(); 73 74 // create node and join 75 node = new Node( *ariba, nodeName ); 76 45 logging_info( "[PINGPONG]\t starting up PingPong service ... " ); 46 47 // read config 48 ptree config; 49 try 50 { 51 read_json(config_file, config); 52 } 53 catch ( exception& e ) 54 { 55 logging_warn("ERROR: Failed to read config file »" << config_file << "«: "); 56 logging_warn(e.what()); 57 logging_warn("---> Using fallback config."); 58 59 config.put("ariba.spovnet_name", "pingpong"); 60 } 61 62 name = "TODO"; 63 77 64 // bind communication and node listener 78 node->bind( this ); /*NodeListener*/ 79 node->bind( this, PingPong::PINGPONG_SERVICEID); /*CommunicationListener*/ 80 81 // start node module 82 node->start(); 83 84 // when initiating, you can define the overlay type, default is Chord [CHORD_OVERLAY] 85 SpoVNetProperties params; 86 //params.setBaseOverlayType( SpoVNetProperties::ONE_HOP_OVERLAY ); // alternative: OneHop 87 88 // initiate the spovnet 89 logging_info("initiating spovnet"); 90 node->initiate(spovnetName, params); 91 92 // join the spovnet 93 logging_info("joining spovnet"); 94 node->join(spovnetName); 65 node.bind( this ); /*NodeListener*/ 66 node.bind( this, PingPong::PINGPONG_SERVICEID); /*CommunicationListener*/ 67 68 // connecting 69 logging_debug( "connecting ... " ); 70 71 node.connect(config.get_child("ariba")); 95 72 96 73 // ping pong started up... 97 logging_info( " pingpong starting up with"98 << " [spovnetid " << node ->getSpoVNetId().toString() << "]"99 << " and [nodeid " << node ->getNodeId().toString() << "]" );74 logging_info( "[PINGPONG]\t pingpong starting up with" 75 << " [spovnetid " << node.getSpoVNetId().toString() << "]" 76 << " and [nodeid " << node.getNodeId().toString() << "]" ); 100 77 } 101 78 … … 103 80 void PingPong::shutdown() { 104 81 105 logging_info( " pingpong service starting shutdown sequence ..." );82 logging_info( "[PINGPONG]\t pingpong service starting shutdown sequence ..." ); 106 83 107 84 // stop timer … … 109 86 110 87 // leave spovnet 111 node ->leave();88 node.leave(); 112 89 113 90 // unbind communication and node listener 114 node->unbind( this ); /*NodeListener*/ 115 node->unbind( this, PingPong::PINGPONG_SERVICEID ); /*CommunicationListener*/ 116 117 // stop the ariba module 118 ariba->stop(); 119 120 // delete node and ariba module 121 delete node; 122 delete ariba; 91 node.unbind( this ); /*NodeListener*/ 92 node.unbind( this, PingPong::PINGPONG_SERVICEID ); /*CommunicationListener*/ 123 93 124 94 // now we are completely shut down … … 129 99 void PingPong::eventFunction() { 130 100 101 if ( node.getNeighborNodes().size() == 0) 102 { 103 logging_info( "[PINGPONG]\t +++ no neighbors +++" ); 104 return; 105 } 106 131 107 // we ping all nodes that are known in the overlay structure 132 108 // this can be all nodes (OneHop) overlay or just some neighbors … … 140 116 141 117 pingId++; 142 logging_info( "pinging overlay neighbors with ping id " << pingId ); 118 // logging_info( endl << "#################" << endl 119 logging_info( endl << "|||||||||| >>>>>>>>>>" << endl 120 << "[PINGPONG]\t PINGING overlay neighbors with ping id " << pingId ); 143 121 PingPongMessage pingmsg( pingId, name ); 144 122 145 123 //----------------------------------------------------------------------- 146 124 // Option 1: get all neighboring nodes and send the message to each … … 154 132 s = s+names[i]; 155 133 } 156 logging_info(" ----> I am " << name << " and I know " << s);134 logging_info("[PINGPONG]\t ----> I am " << name << " and I know " << s); 157 135 names.clear(); 158 136 } 159 137 160 vector<NodeID> nodes = node ->getNeighborNodes();138 vector<NodeID> nodes = node.getNeighborNodes(); 161 139 BOOST_FOREACH( NodeID nid, nodes ){ 162 logging_info( " sending ping message to " << nid.toString() );163 node ->sendMessage( pingmsg, nid, PingPong::PINGPONG_SERVICEID );140 logging_info( "[PINGPONG]\t sending ping message to " << nid.toString() ); 141 node.sendMessage( pingmsg, nid, PingPong::PINGPONG_SERVICEID ); 164 142 } 165 143 … … 195 173 for (int i=0;i<names.size(); i++) if (names[i]==pingmsg->getName()) found=true; 196 174 if (!found) names.push_back(pingmsg->getName()); 197 logging_info( "received ping message on link " << lnk.toString() 175 // logging_info( endl << "#################" << endl 176 logging_info( endl << "<<<<<<<<<< ||||||||||" << endl 177 << "[PINGPONG]\t RECEIVED ping message on link " 178 << lnk.toString() 179 << " " << (node.isLinkDirect(lnk) ? "[DIRECT-LINK]" : "[INDIRECT-LINK]") 180 << " HopCount: " << node.getHopCount(lnk) 198 181 << " from node " << remote.toString() 182 << " (" << pingmsg->getName() << ")" 199 183 << ": " << pingmsg->info() ); 200 184 delete pingmsg; -
sample/pingpong/PingPong.h
r10570 r12060 33 33 34 34 public: 35 PingPong( );35 PingPong( string config ); 36 36 virtual ~PingPong(); 37 37 … … 61 61 private: 62 62 // the ariba module and a node 63 AribaModule* ariba;64 Node *node;63 // AribaModule* ariba; 64 Node node; 65 65 string name; 66 66 int counter; … … 73 73 unsigned long pingId; 74 74 75 string config_file; 75 76 }; 76 77 -
sample/pingpong/main.cpp
r5151 r12060 10 10 11 11 // get config file 12 string config = "../etc/settings.cnf";12 string config; 13 13 if (argc >= 2) config = argv[1]; 14 14 15 StartupWrapper::initConfig( config );15 // StartupWrapper::initConfig( config ); 16 16 StartupWrapper::startSystem(); 17 17 18 18 // this will do the main functionality and block 19 PingPong ping ;19 PingPong ping(config); 20 20 StartupWrapper::startup(&ping); 21 21 -
source/ariba/CMakeLists.txt
r11885 r12060 46 46 ### Increment this whenever the interface changes! ### 47 47 ###################################################### 48 set(ariba_SOVERSION 1)48 set(ariba_SOVERSION 2) 49 49 ###################################################### 50 50 … … 184 184 add_headers( 185 185 ariba.h 186 AribaModule.h186 # AribaModule.h 187 187 CommunicationListener.h 188 188 DataMessage.h … … 199 199 200 200 add_sources( 201 AribaModule.cpp201 # AribaModule.cpp 202 202 CommunicationListener.cpp 203 203 DataMessage.cpp -
source/ariba/CommunicationListener.cpp
r7468 r12060 59 59 60 60 void CommunicationListener::onLinkFail(const LinkID& l, const NodeID& r) { 61 onLinkDown(l, r); 61 62 } 62 63 … … 69 70 } 70 71 72 // this implementation provides backward compatibility 73 // overwrite it to use the new interface 74 void CommunicationListener::onMessage(reboost::shared_buffer_t message, 75 const NodeID& remote, 76 const LinkID& lnk, 77 const SequenceNumber& seqnum, 78 const ariba::overlay::OverlayMsg* overlay_msg) 79 { 80 // copy data 81 Data data; 82 data.setLength(message.size() * 8); 83 memcpy(data.getBuffer(), message.data(), message.size()); 84 85 // and prepare old-style overlay message 86 Message legacy_msg; 87 legacy_msg.setPayload(data); 88 89 90 // * call legacy handler * 91 this->onMessage(legacy_msg, remote, lnk); 92 } 93 71 94 void CommunicationListener::onKeyValue( const Data& key, const vector<Data>& value ) { 72 95 } 73 96 97 bool CommunicationListener::onPing(const NodeID& remote) 98 { 99 return true; 100 } 101 102 void CommunicationListener::onPingLost(const NodeID& remote) 103 { 104 } 105 106 void CommunicationListener::onPong(const NodeID& remote) 107 { 108 } 109 74 110 } // namespace ariba -
source/ariba/CommunicationListener.h
r9684 r12060 44 44 #include "LinkProperties.h" 45 45 #include "DataMessage.h" 46 #include "ariba/overlay/SequenceNumber.h" 46 47 47 48 namespace ariba { 49 50 typedef ariba::overlay::SequenceNumber SequenceNumber; 48 51 49 52 // forward decl 50 53 namespace overlay { 51 54 class BaseOverlay; 55 class OverlayMsg; 52 56 } 53 57 … … 116 120 117 121 /** 122 * @DEPRECATED 123 * 118 124 * Called when a message is incoming 119 125 * @param msg The data message that is received … … 123 129 virtual void onMessage(const DataMessage& msg, const NodeID& remote, 124 130 const LinkID& lnk = LinkID::UNSPECIFIED); 131 132 133 /** 134 * NOTE: This interface is still unstable and may change in future releases. 135 * ---> Especially the parameter «overlay_msg» may be replaced or removed. 136 * 137 * Called when a message is incoming 138 * @param msg A shared buffer with the (serialized) payload 139 * @param remote The remote node that sent the message 140 * @param lnk The link id of the link where the message is received 141 * @param overlay_msg A pointer to the ariba internal Overlay Message 142 */ 143 virtual void onMessage(reboost::shared_buffer_t message, 144 const NodeID& remote, 145 const LinkID& lnk, 146 const SequenceNumber& seqnum, 147 const ariba::overlay::OverlayMsg* overlay_msg); 125 148 126 149 // --- dht functionality --- … … 133 156 virtual void onKeyValue( const Data& key, const vector<Data>& value ); 134 157 158 // --- ping pong (routed) --- 159 /** 160 * Called when a ping message was received. 161 * 162 * @param remote Node which sent the ping message 163 * @return bool Allows to send a pong message (default: true) 164 */ 165 virtual bool onPing(const NodeID& remote); 166 167 /** 168 * Called when a ping message is lost. 169 * 170 * NOTE: A ping message could also get lost without notice. 171 * 172 * @param remote Node to which the ping message was sent 173 */ 174 virtual void onPingLost(const NodeID& remote); 175 176 /** 177 * Called when a pong message was received. 178 * 179 * @param remote Node which sent the pong message 180 */ 181 virtual void onPong(const NodeID& remote); 135 182 }; 136 183 -
source/ariba/DataMessage.h
r9684 r12060 9 9 // use message utility 10 10 #ifdef USE_MESSAGE_UTILITY 11 #include "ariba/utility/messages.h" 11 // #include "ariba/utility/messages.h" 12 #include "ariba/utility/messages/Message.h" 12 13 namespace ariba { 13 14 typedef utility::Message Message; -
source/ariba/Message.h
r9684 r12060 41 41 42 42 #include <inttypes.h> 43 #include "ariba/utility/messages.h" 43 44 // legacy messages 45 #include "ariba/utility/messages/Message.h" 46 // reboost messages 47 #include "ariba/utility/transport/messages/message.hpp" 44 48 45 49 /** \addtogroup public -
source/ariba/Node.cpp
r10653 r12060 39 39 #include "Node.h" 40 40 41 #include <boost/foreach.hpp> 42 41 43 #include "ariba/overlay/BaseOverlay.h" 44 #include "ariba/communication/BaseCommunication.h" 45 42 46 #include "ariba/utility/types/OverlayParameterSet.h" 43 47 #include "ariba/communication/EndpointDescriptor.h" 44 48 49 #include <boost/property_tree/exceptions.hpp> 50 51 using namespace std; 45 52 using ariba::communication::EndpointDescriptor; 53 using boost::property_tree::ptree; 46 54 47 55 namespace ariba { 48 56 49 Node::Node(AribaModule& ariba_mod, const Name& node_name) : 50 name(node_name), ariba_mod(ariba_mod) { 51 base_overlay = new BaseOverlay(); 57 //Node::Node(AribaModule& ariba_mod, const Name& node_name) : 58 // name(node_name), ariba_mod(ariba_mod) { 59 // base_overlay = new BaseOverlay(); 60 //} 61 62 Node::Node() : 63 name(Name::UNSPECIFIED), 64 base_communication(NULL), 65 base_overlay(NULL) 66 { 67 base_communication = new BaseCommunication(); 68 base_overlay = new BaseOverlay(); 52 69 } 53 70 … … 55 72 delete base_overlay; 56 73 base_overlay = NULL; 57 } 58 59 void Node::join(const Name& vnetname) { 60 spovnetId = vnetname.toSpoVNetId(); 61 nodeId = generateNodeId(name); 62 63 // start base comm if not started 64 if( !ariba_mod.base_comm->isStarted() ) 65 ariba_mod.base_comm->start(); 66 67 // start base overlay if not started 68 // join against ourselfs 69 if( !base_overlay->isStarted() ) 70 base_overlay->start( *ariba_mod.base_comm, nodeId ); 71 base_overlay->joinSpoVNet( spovnetId ); 72 73 // join against static bootstrap points and 74 // start automatic bootstrapping modules 75 vector<AribaModule::BootstrapMechanism> mechanisms 76 = ariba_mod.getBootstrapMechanisms(vnetname); 77 78 vector<pair<BootstrapManager::BootstrapType,string> > internalmodules; 79 80 BOOST_FOREACH(AribaModule::BootstrapMechanism m, mechanisms){ 81 switch(m){ 82 case AribaModule::BootstrapMechanismStatic: 83 { 84 const communication::EndpointDescriptor* ep = 85 ariba_mod.getBootstrapNode(vnetname, m); 86 if( ep != NULL && ep->isUnspecified() == false ) 87 base_overlay->joinSpoVNet( spovnetId, *ep); 88 break; 89 } 90 case AribaModule::BootstrapMechanismBroadcast: 91 internalmodules.push_back(make_pair( 92 BootstrapManager::BootstrapTypePeriodicBroadcast, 93 ariba_mod.getBootstrapInfo(vnetname, m))); 94 break; 95 case AribaModule::BootstrapMechanismMulticastDNS: 96 internalmodules.push_back(make_pair( 97 BootstrapManager::BootstrapTypeMulticastDns, 98 ariba_mod.getBootstrapInfo(vnetname, m))); 99 break; 100 case AribaModule::BootstrapMechanismSDP: 101 internalmodules.push_back(make_pair( 102 BootstrapManager::BootstrapTypeBluetoothSdp, 103 ariba_mod.getBootstrapInfo(vnetname, m))); 104 break; 105 default: 106 break; 107 } 108 } 109 110 // start automatic overlay bootstrapping modules 111 base_overlay->startBootstrapModules(internalmodules); 112 113 // done 114 } 115 116 void Node::initiate(const Name& vnetname, const SpoVNetProperties& parm) { 117 utility::OverlayParameterSet ovrpset; 118 ovrpset.setOverlayStructure( 119 (utility::OverlayParameterSet::_OverlayStructure) 120 parm.getBaseOverlayType() 121 ); 122 123 spovnetId = vnetname.toSpoVNetId(); 124 nodeId = generateNodeId(name); 125 126 // start base comm if not started 127 if( !ariba_mod.base_comm->isStarted() ) 128 ariba_mod.base_comm->start(); 129 130 // start base overlay if not started 131 if( !base_overlay->isStarted() ) 132 base_overlay->start( *ariba_mod.base_comm, nodeId ); 133 134 base_overlay->createSpoVNet( spovnetId, ovrpset ); 135 } 74 75 delete base_communication; 76 base_communication = NULL; 77 } 78 79 void Node::connect(const ptree& config) 80 { 81 using namespace boost::property_tree; 82 using namespace addressing2; 83 84 assert( ! base_communication->isStarted() ); 85 assert( ! base_overlay->isStarted() ); 86 87 // XXX needed since »empty_ptree<ptree>()« is not working 88 // ---> see: http://stackoverflow.com/questions/5003549/where-is-boost-property-treeempty-ptree 89 static const ptree empty_pt; 90 91 // SpovNet ID 92 Name spovnet_name(config.get<string>("spovnet_name")); 93 spovnetId = spovnet_name.toSpoVNetId(); 94 95 // Node ID 96 try 97 { 98 name = config.get<string>("node_name"); 99 } 100 catch ( ptree_bad_path& e ) 101 { 102 name = Name::UNSPECIFIED; 103 } 104 nodeId = generateNodeId(name); 105 106 107 108 /* Base Communication */ 109 EndpointSetPtr listen_on; 110 try 111 { 112 listen_on = endpoint_set::create_EndpointSet( 113 config.get_child("listen_on")); 114 } 115 catch ( ptree_bad_path& e ) 116 { 117 /* no endpoints specified, using default: »[::]:41322+« */ 118 119 ptree default_listen_on; 120 default_listen_on.put("endp.category", "TCPIP"); 121 default_listen_on.put("endp.addr", "::"); 122 default_listen_on.put("endp.port", 0); // defaults to 41322 (or higher) 123 124 listen_on = endpoint_set::create_EndpointSet(default_listen_on); 125 // logging_warn("No endpoints specified in config. ---> Using default."); 126 cout << "No endpoints specified in config. ---> Using default." << endl; 127 } 128 base_communication->start(listen_on); 129 130 // TODO maybe notify the upper layer whether we have any active endpoints 131 132 133 /* Base Overlay */ 134 base_overlay->start( base_communication, nodeId ); 135 136 base_overlay->createSpoVNet( spovnetId ); 137 base_overlay->joinSpoVNet( spovnetId ); 138 139 140 141 /* Bootstrap */ 142 const ptree& bootstrap_pt = config.get_child("bootstrap", empty_pt); 143 144 // Static Bootstrap 145 try 146 { 147 // read endpoint_set from config 148 EndpointSetPtr ep_set = endpoint_set::create_EndpointSet( 149 bootstrap_pt.get_child("direct")); 150 151 EndpointDescriptor ep = EndpointDescriptor::UNSPECIFIED(); 152 ep.replace_endpoint_set(ep_set); 153 154 // try to connect 155 base_overlay->joinSpoVNet( spovnetId, ep); 156 } 157 catch ( ptree_bad_path& e ) 158 { 159 // logging_info("No direct bootstrap info in config."); 160 cout << "No direct bootstrap info in config." << endl; 161 } 162 163 164 /* Bootstrap modules */ 165 vector<pair<BootstrapManager::BootstrapType,string> > internalmodules; 166 167 // Bootstrap: Broadcast 168 if ( bootstrap_pt.get("broadcast", false) ) 169 { 170 internalmodules.push_back(make_pair( 171 BootstrapManager::BootstrapTypePeriodicBroadcast,"")); 172 } 173 174 // Bootstrap: MDNS 175 if ( bootstrap_pt.get("mdns", false) ) 176 { 177 internalmodules.push_back(make_pair( 178 BootstrapManager::BootstrapTypeMulticastDns,"")); 179 } 180 181 // Bootstrap: SDP 182 if ( bootstrap_pt.get("sdp", false) ) 183 { 184 internalmodules.push_back(make_pair( 185 BootstrapManager::BootstrapTypeBluetoothSdp,"")); 186 } 187 188 // start automatic overlay bootstrapping modules 189 base_overlay->startBootstrapModules(internalmodules); 190 } 191 192 //void Node::join(const Name& vnetname) { 193 // spovnetId = vnetname.toSpoVNetId(); 194 // nodeId = generateNodeId(name); 195 // 196 // // start base comm if not started 197 // if( !ariba_mod.base_comm->isStarted() ) 198 // ariba_mod.base_comm->start(); 199 // 200 // // start base overlay if not started 201 // // join against ourselfs 202 // if( !base_overlay->isStarted() ) 203 // base_overlay->start( *ariba_mod.base_comm, nodeId ); 204 // base_overlay->joinSpoVNet( spovnetId ); 205 // 206 // // join against static bootstrap points and 207 // // start automatic bootstrapping modules 208 // vector<AribaModule::BootstrapMechanism> mechanisms 209 // = ariba_mod.getBootstrapMechanisms(vnetname); 210 // 211 // vector<pair<BootstrapManager::BootstrapType,string> > internalmodules; 212 // 213 // BOOST_FOREACH(AribaModule::BootstrapMechanism m, mechanisms){ 214 // switch(m){ 215 // case AribaModule::BootstrapMechanismStatic: 216 // { 217 // const communication::EndpointDescriptor* ep = 218 // ariba_mod.getBootstrapNode(vnetname, m); 219 // if( ep != NULL && ep->isUnspecified() == false ) 220 // base_overlay->joinSpoVNet( spovnetId, *ep); 221 // break; 222 // } 223 // case AribaModule::BootstrapMechanismBroadcast: 224 // internalmodules.push_back(make_pair( 225 // BootstrapManager::BootstrapTypePeriodicBroadcast, 226 // ariba_mod.getBootstrapInfo(vnetname, m))); 227 // break; 228 // case AribaModule::BootstrapMechanismMulticastDNS: 229 // internalmodules.push_back(make_pair( 230 // BootstrapManager::BootstrapTypeMulticastDns, 231 // ariba_mod.getBootstrapInfo(vnetname, m))); 232 // break; 233 // case AribaModule::BootstrapMechanismSDP: 234 // internalmodules.push_back(make_pair( 235 // BootstrapManager::BootstrapTypeBluetoothSdp, 236 // ariba_mod.getBootstrapInfo(vnetname, m))); 237 // break; 238 // default: 239 // break; 240 // } 241 // } 242 // 243 // // start automatic overlay bootstrapping modules 244 // base_overlay->startBootstrapModules(internalmodules); 245 // 246 // // done 247 //} 248 // 249 //void Node::initiate(const Name& vnetname, const SpoVNetProperties& parm) { 250 // utility::OverlayParameterSet ovrpset; 251 // ovrpset.setOverlayStructure( 252 // (utility::OverlayParameterSet::_OverlayStructure) 253 // parm.getBaseOverlayType() 254 // ); 255 // 256 // spovnetId = vnetname.toSpoVNetId(); 257 // nodeId = generateNodeId(name); 258 // 259 // // start base comm if not started 260 // if( !ariba_mod.base_comm->isStarted() ) 261 // ariba_mod.base_comm->start(); 262 // 263 // // start base overlay if not started 264 // if( !base_overlay->isStarted() ) 265 // base_overlay->start( *ariba_mod.base_comm, nodeId ); 266 // 267 // base_overlay->createSpoVNet( spovnetId, ovrpset ); 268 //} 136 269 137 270 void Node::leave() { 138 271 base_overlay->stopBootstrapModules(); 139 272 base_overlay->leaveSpoVNet(); 140 ariba_mod.base_comm->stop();273 base_communication->stop(); // XXX before »base_overlay->stop()« ?? 141 274 base_overlay->stop(); 142 275 } … … 172 305 } 173 306 307 bool Node::isLinkDirect(const ariba::LinkID& lnk) const 308 { 309 return base_overlay->isLinkDirect(lnk); 310 } 311 312 int Node::getHopCount(const ariba::LinkID& lnk) const 313 { 314 return base_overlay->getHopCount(lnk); 315 } 316 317 318 319 320 /* +++++ Message sending +++++ */ 321 void Node::check_send_priority(uint8_t priority) 322 { 323 if ( priority < send_priority::HIGHEST || priority > send_priority::LOWEST ) 324 throw std::invalid_argument("Illegal priority"); 325 } 326 327 328 // +++ new interface +++ 329 const SequenceNumber& Node::sendMessage(reboost::message_t msg, const LinkID& lnk, uint8_t priority) 330 { 331 // check priority 332 check_send_priority(priority); 333 334 // * call base overlay * 335 return base_overlay->sendMessage(msg, lnk, priority); 336 } 337 338 // +++ legacy interface +++ 339 seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk) 340 { 341 reboost::message_t message = ((Message*) msg)->wrap_up_for_sending(); 342 343 try 344 { 345 base_overlay->sendMessage(message, lnk, send_priority::NORMAL); 346 return 0; 347 } 348 catch ( ariba::overlay::message_not_sent& e ) 349 { 350 logging_warn("Message could not be sent. Dropped."); 351 return -1; 352 } 353 } 354 355 356 // +++ new interface +++ 357 const SequenceNumber& Node::sendMessage(reboost::message_t msg, const NodeID& nid, 358 const ServiceID& sid, uint8_t priority, const LinkProperties& req) { 359 360 // check priority 361 check_send_priority(priority); 362 363 // * call base overlay * 364 return base_overlay->sendMessage(msg, nid, priority, sid); 365 } 366 367 // +++ legacy interface +++ 174 368 seqnum_t Node::sendMessage(const DataMessage& msg, const NodeID& nid, 175 369 const ServiceID& sid, const LinkProperties& req) { 176 return base_overlay->sendMessage((Message*) msg, nid, sid); 177 } 178 370 371 // reboost::message_t message = ((Message*) msg)->wrap_up_for_sending(); 372 reboost::message_t message; 373 message.push_back( ((Message*) msg)->serialize_into_shared_buffer() ); 374 375 try 376 { 377 sendMessage(message, nid, sid, send_priority::NORMAL, req); 378 return 0; 379 } 380 catch ( ariba::overlay::message_not_sent& e ) 381 { 382 logging_warn("Message could not be sent. Dropped."); 383 return -1; 384 } 385 } 386 387 388 // +++ new interface +++ 389 NodeID Node::sendMessageCloserToNodeID(reboost::message_t msg, const NodeID& nid, const ServiceID& sid, 390 uint8_t priority, const LinkProperties& req) { 391 392 // check priority 393 check_send_priority(priority); 394 395 // * call base overlay * 396 return base_overlay->sendMessageCloserToNodeID(msg, nid, priority, sid); 397 } 398 399 // +++ legacy interface +++ 179 400 NodeID Node::sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid, 180 401 const LinkProperties& req) { 181 402 182 return base_overlay->sendMessageCloserToNodeID((Message*) msg, nid, sid); 183 } 184 185 186 seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk) { 187 return base_overlay->sendMessage((Message*) msg, lnk); 188 } 189 403 reboost::message_t message = ((Message*) msg)->wrap_up_for_sending(); 404 405 return sendMessageCloserToNodeID(message, nid, sid, send_priority::NORMAL, req); 406 } 407 408 409 // +++ new interface +++ 410 void Node::sendBroadcastMessage(reboost::message_t msg, const ServiceID& sid, uint8_t priority) { 411 412 // check priority 413 check_send_priority(priority); 414 415 // * call base overlay * 416 return base_overlay->broadcastMessage(msg, sid, priority); 417 } 418 419 // +++ legacy interface +++ 190 420 void Node::sendBroadcastMessage(const DataMessage& msg, const ServiceID& sid) { 191 return base_overlay->broadcastMessage((Message*)msg, sid); 192 } 421 reboost::message_t message = ((Message*) msg)->wrap_up_for_sending(); 422 423 return sendBroadcastMessage(message, sid); 424 } 425 426 427 /* +++++ [Message sending] +++++ */ 428 429 430 193 431 194 432 bool Node::bind(NodeListener* listener) { … … 204 442 bool ret = base_overlay->bind(listener, sid); 205 443 206 // now that we have a listener, we can ask if sniffing is ok207 if( ariba_mod.sideport_sniffer != NULL ){208 base_overlay->registerSidePort(ariba_mod.sideport_sniffer);209 }444 // // now that we have a listener, we can ask if sniffing is ok 445 // if( ariba_mod.sideport_sniffer != NULL ){ 446 // base_overlay->registerSidePort(ariba_mod.sideport_sniffer); 447 // } 210 448 211 449 return ret; -
source/ariba/Node.h
r10653 r12060 43 43 namespace ariba { 44 44 class Node; 45 namespace overlay { 46 class BaseOverlay; 47 } 45 namespace communication { 46 class BaseCommunication; 47 } 48 namespace overlay { 49 class BaseOverlay; 50 } 48 51 } 49 52 50 53 #include <vector> 51 54 #include <iostream> 52 #include <boost/foreach.hpp> 55 #include <exception> 56 53 57 #include "Module.h" 54 58 #include "Identifiers.h" … … 56 60 #include "NodeListener.h" 57 61 #include "Name.h" 58 #include "AribaModule.h"62 //#include "AribaModule.h" 59 63 #include "CommunicationListener.h" 60 64 #include "DataMessage.h" 61 65 #include "SideportListener.h" 66 #include "ariba/overlay/SequenceNumber.h" 67 68 // reboost messages 69 #include "ariba/utility/transport/messages/message.hpp" 70 71 #include <boost/property_tree/ptree.hpp> 62 72 63 73 using std::vector; … … 65 75 66 76 namespace ariba { 77 78 // typedef ariba::overlay::SequenceNumber SequenceNumber; // XXX see CommunicationListener 79 80 using boost::property_tree::ptree; 81 82 // sendMessage-Priorities 83 struct send_priority 84 { 85 enum SEND_PRIORITY 86 { 87 HIGHEST = 2, 88 HIGHER = 3, 89 NORMAL = 4, 90 LOWER = 5, 91 LOWEST = 6 92 }; 93 }; 94 95 67 96 68 97 /** … … 75 104 * @author Christoph Mayer <mayer@tm.uka.de> 76 105 */ 106 // TODO do we really want to inherit from Module.. ? 77 107 class Node: public Module { 78 108 public: 109 79 110 /** 80 111 * Constructs a new node using a given ariba module … … 86 117 * is a zero-terminated char-string. 87 118 */ 88 Node(AribaModule& ariba_mod, const Name& node_name = Name::UNSPECIFIED); 119 // Node(AribaModule& ariba_mod, const Name& node_name = Name::UNSPECIFIED); 120 121 // XXX EXPERIMENTAL 122 Node(); 89 123 90 124 /** … … 100 134 //--- node control --- 101 135 136 /** 137 * XXX EXPERIMENTAL 138 * 139 * Replaces initialte & join 140 */ 141 void connect(const ptree& config); 142 143 // XXX DEPRECATED 102 144 /** 103 145 * This method instructs the node to join a particular spovnet. … … 107 149 * @param vnetId The SpoVNet name 108 150 */ 109 void join(const Name& name);151 // void join(const Name& name); 110 152 111 153 /** … … 116 158 * @param param The SpoVNet properties 117 159 */ 118 void initiate(const Name& name, const SpoVNetProperties& parm =119 SpoVNetProperties::DEFAULT);160 // void initiate(const Name& name, const SpoVNetProperties& parm = 161 // SpoVNetProperties::DEFAULT); 120 162 121 163 /** … … 221 263 void dropLink(const LinkID& lnk); 222 264 223 // message sending 224 225 /** 226 * Sends a one-shot message to a service. If link properties are specified, 227 * the node tries to fulfill those requirements. This may cause the node 228 * to first establish a temporary link, second sending the message and last 229 * dropping the link. This would result in a small amount of extra latency 230 * until the message is delivered. If reliable transport was selected, 231 * the method returns a sequence number and a communication event is 232 * triggered on message delivery or loss. 233 * 234 * @param msg The message to be sent 235 * @param nid The remote node identifier 236 * @param sid The remote service identifier 237 * @param req The requirements associated with the message 238 * @return A sequence number 239 */ 265 /** 266 * Returns whether a link is direct or relayed over other nodes 267 * @param lnk LinkID of the link 268 * @return true if link is direct; false otherwise 269 */ 270 bool isLinkDirect(const ariba::LinkID& lnk) const; 271 272 /** 273 * Returns the latest measured hop count on this link. 274 * NOTE: This is not guaranteed to be up to date. 275 * 276 * @param lnk LinkID of the link 277 * @return overlay hop count on this link 278 */ 279 int getHopCount(const ariba::LinkID& lnk) const; 280 281 282 /* +++++ Message sending +++++ */ 283 284 285 /** 286 * Sends a message via an established link. If reliable transport was 287 * selected, the method returns a sequence number and a communication event 288 * is triggered on message delivery or loss. 289 * 290 * +++ New interface, using efficient zero-copy reboost messages. +++ 291 * 292 * @param msg The message to be sent 293 * @param lnk The link to be used for sending the message 294 */ 295 const SequenceNumber& sendMessage(reboost::message_t msg, const LinkID& lnk, uint8_t priority=send_priority::NORMAL); 296 297 /** 298 * +++ Legacy interface, converts old ariba messages into new reboost messages. +++ 299 */ 300 seqnum_t sendMessage(const DataMessage& msg, const LinkID& lnk); 301 302 303 /** 304 * Sends a one-shot message to a service. If link properties are specified, 305 * the node tries to fulfill those requirements. This may cause the node 306 * to first establish a temporary link, second sending the message and last 307 * dropping the link. This would result in a small amount of extra latency 308 * until the message is delivered. If reliable transport was selected, 309 * the method returns a sequence number and a communication event is 310 * triggered on message delivery or loss. 311 * 312 * +++ New interface, using efficient zero-copy reboost messages. +++ 313 * 314 * @param msg The message to be sent 315 * @param nid The remote node identifier 316 * @param sid The remote service identifier 317 * @param req The requirements associated with the message 318 * @return A sequence number 319 */ 320 const SequenceNumber& sendMessage(reboost::message_t msg, const NodeID& nid, const ServiceID& sid, 321 uint8_t priority=send_priority::NORMAL, const LinkProperties& req = LinkProperties::DEFAULT); 322 323 /** 324 * +++ Legacy interface, converts old ariba messages into new reboost messages. +++ 325 */ 240 326 seqnum_t sendMessage(const DataMessage& msg, const NodeID& nid, const ServiceID& sid, 241 327 const LinkProperties& req = LinkProperties::DEFAULT); 242 328 243 /** 244 * like the above function, but sends the message to the closest directly known node 245 * to the specified address 246 */ 329 330 /** 331 * like the above function, but sends the message to the closest directly known node 332 * to the specified address 333 * 334 * +++ New interface, using efficient zero-copy reboost messages. +++ 335 * 336 */ 337 NodeID sendMessageCloserToNodeID(reboost::message_t msg, const NodeID& nid, const ServiceID& sid, 338 uint8_t priority=send_priority::NORMAL, const LinkProperties& req = LinkProperties::DEFAULT); 339 340 /** 341 * +++ Legacy interface, converts old ariba messages into new reboost messages. +++ 342 */ 247 343 NodeID sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid, 248 344 const LinkProperties& req = LinkProperties::DEFAULT); 249 345 250 /** 251 * Sends a message via an established link. If reliable transport was 252 * selected, the method returns a sequence number and a communication event 253 * is triggered on message delivery or loss. 254 * 255 * @param msg The message to be sent 256 * @param lnk The link to be used for sending the message 257 */ 258 seqnum_t sendMessage(const DataMessage& msg, const LinkID& lnk); 259 260 /** 261 * Sends a message to all known hosts in the overlay structure 262 * the nodes that are reached here depend on the overlay structure. 263 * 264 * @param msg The message to be send 265 * @param sid The id of the service that should receive the message 266 * @see getNeighborNodes 267 */ 346 347 /** 348 * Sends a message to all known hosts in the overlay structure 349 * the nodes that are reached here depend on the overlay structure. 350 * 351 * +++ New interface, using efficient zero-copy reboost messages. +++ 352 * 353 * @param msg The message to be send 354 * @param sid The id of the service that should receive the message 355 * @see getNeighborNodes 356 */ 357 void sendBroadcastMessage(reboost::message_t msg, const ServiceID& sid, uint8_t priority=send_priority::NORMAL); 358 359 /** 360 * +++ Legacy interface, converts old ariba messages into new reboost messages. +++ 361 */ 268 362 void sendBroadcastMessage(const DataMessage& msg, const ServiceID& sid); 269 363 364 365 /* +++++ [Message sending] +++++ */ 366 367 368 270 369 // --- communication listeners --- 271 370 … … 313 412 /** @see Module.h */ 314 413 string getName() const; 315 414 415 416 private: 417 inline void check_send_priority(uint8_t priority); 418 419 316 420 protected: 317 421 // friends … … 320 424 // member variables 321 425 Name name; //< node name 322 AribaModule&ariba_mod; //< ariba module426 // AribaModule* ariba_mod; //< ariba module 323 427 SpoVNetID spovnetId; //< current spovnet id 324 428 NodeID nodeId; //< current node id 429 communication::BaseCommunication* base_communication; 325 430 overlay::BaseOverlay* base_overlay; //< the base overlay 326 431 -
source/ariba/SideportListener.cpp
r7468 r12060 41 41 #include "ariba/overlay/BaseOverlay.h" 42 42 #include "ariba/overlay/LinkDescriptor.h" 43 #include "ariba/utility/addressing /endpoint_set.hpp"43 #include "ariba/utility/addressing2/tcpip_endpoint.hpp" 44 44 45 45 using ariba::overlay::LinkDescriptor; … … 130 130 if( overlay == NULL ) return (Protocol)ret; 131 131 132 using namespace ariba::addressing;133 134 132 LinkDescriptor* link = NULL; 135 133 BOOST_FOREACH( LinkDescriptor* lnk, overlay->links ){ … … 147 145 if(bclink.isUnspecified() || bclink.remoteLocator == NULL) return (Protocol)ret; 148 146 149 const address_v*locator = bclink.remoteLocator;147 addressing2::EndpointPtr locator = bclink.remoteLocator; 150 148 151 if( locator->instanceof<tcpip_endpoint>() ){ 152 tcpip_endpoint tcpip = *locator; 153 154 if( tcpip.address().is_v4() || tcpip.address().is_v4_mapped() ){ 149 if( locator->get_category() == addressing2::endpoint_category::TCPIP ) 150 { 151 boost::shared_ptr<addressing2::tcpip_endpoint> endp = 152 boost::dynamic_pointer_cast<addressing2::tcpip_endpoint>(locator); 153 154 if( endp->to_asio().address().is_v4() ) 155 { 155 156 ret = SideportListener::ipv4; 156 }else if( tcpip.address().is_v6() ){157 }else { 157 158 ret = SideportListener::ipv6; 158 159 } 159 160 160 }else if( locator-> instanceof<rfcomm_endpoint>()){161 }else if( locator->get_category() == addressing2::endpoint_category::BLUETOOTH ){ 161 162 ret = SideportListener::rfcomm; 162 163 } -
source/ariba/ariba.h
r3374 r12060 44 44 */ 45 45 46 #include "AribaModule.h"47 46 #include "CommunicationListener.h" 48 47 #include "DataMessage.h" -
source/ariba/communication/BaseCommunication.cpp
r10767 r12060 57 57 namespace communication { 58 58 59 using namespace ariba::addressing2; 60 59 61 using ariba::utility::PeerID; 60 62 using ariba::utility::SystemQueue; 61 63 62 64 use_logging_cpp(BaseCommunication); 63 64 /// adds an endpoint to the list65 void BaseCommunication::add_endpoint( const address_v* endpoint ) {66 if (endpoint==NULL) return;67 BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {68 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {69 ref.count++;70 return;71 }72 }73 endpoint_reference ref;74 ref.endpoint = endpoint->clone();75 ref.count = 1;76 remote_endpoints.push_back(ref);77 }78 79 /// removes an endpoint from the list80 void BaseCommunication::remove_endpoint( const address_v* endpoint ) {81 if (endpoint==NULL) return;82 for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();83 i != remote_endpoints.end(); i++) {84 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {85 i->count--;86 if (i->count==0) {87 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");88 transport->terminate(i->endpoint);89 delete i->endpoint;90 remote_endpoints.erase(i);91 }92 return;93 }94 }95 }96 65 97 66 … … 100 69 transport( NULL ), 101 70 messageReceiver( NULL ), 102 started( false ) 71 started( false ), 72 listenOn_endpoints(new addressing2::endpoint_set()) 103 73 { 104 74 } … … 109 79 110 80 111 void BaseCommunication::start() { 81 void BaseCommunication::start(EndpointSetPtr listen_on) { 82 assert ( ! started ); 83 84 listenOn_endpoints = listen_on; 85 logging_info("Setting local end-points: " << listenOn_endpoints->to_string()); 86 112 87 logging_info( "Starting up ..." ); 113 88 currentSeqnum = 0; 114 89 115 // set local peer id116 localDescriptor.getPeerId() = PeerID::random();117 logging_info( "Using PeerID: " << localDescriptor.getPeerId() );118 119 90 // creating transports 91 // ---> transport_peer holds the set of the active endpoints we're listening on 120 92 logging_info( "Creating transports ..." ); 121 122 #ifdef UNDERLAY_OMNET 123 AribaOmnetModule* module = StartupWrapper::getCurrentModule(); 124 module->setServerPort( listenport ); 125 126 transport = module; 127 network = new OmnetNetworkProtocol( module ); 128 #else 129 transport = new transport_peer( localDescriptor.getEndpoints() ); 130 #endif 93 transport = new transport_peer(); 94 active_listenOn_endpoints = transport->add_listenOn_endpoints(listenOn_endpoints); 95 logging_info( "XXX. Active endpoints = " << active_listenOn_endpoints->to_string() ); // XXX 131 96 132 97 logging_info( "Searching for local locators ..." ); 133 /** 134 * DONT DO THAT: if(localDescriptor.getEndpoints().to_string().length() == 0) 135 * since addresses are used to initialize transport addresses 136 */ 137 AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() ); 138 logging_info( "Done. Local endpoints = " << localDescriptor.toString() ); 139 98 local_endpoints = AddressDiscovery::discover_endpoints(active_listenOn_endpoints); 99 if ( local_endpoints->count() > 0 ) 100 { 101 logging_info( "Done. Discovered local endpoints: " << local_endpoints->to_string() ); 102 } 103 else 104 { 105 logging_warn("WARING!! No local endpoints found, NO COMMUNICATION POSSIBLE!!"); 106 107 // TODO notify application, so that it may react properly. throw exception..? 108 assert( false ); 109 } 110 111 112 // create local EndpointDescriptor 113 // ---> localDescriptor hold the set endpoints that can be used to reach us 114 localDescriptor.getPeerId() = PeerID::random(); 115 localDescriptor.replace_endpoint_set(local_endpoints); 116 logging_info( "Using PeerID: " << localDescriptor.getPeerId() ); 117 118 // start transport_peer 140 119 transport->register_listener( this ); 141 120 transport->start(); 142 121 143 #ifndef UNDERLAY_OMNET144 122 // bind to the network change detection 145 123 networkMonitor.registerNotification( this ); 146 #endif147 124 148 125 // base comm startup done … … 163 140 bool BaseCommunication::isStarted(){ 164 141 return started; 165 }166 167 /// Sets the endpoints168 void BaseCommunication::setEndpoints( string& _endpoints ) {169 localDescriptor.getEndpoints().assign(_endpoints);170 logging_info("Setting local end-points: "171 << localDescriptor.getEndpoints().to_string());172 142 } 173 143 … … 193 163 addLink( ld ); 194 164 195 // send a message to request new link to remote 196 logging_debug( "Send messages with request to open link to " << descriptor.toString() ); 197 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid ); 198 baseMsg.getLocalDescriptor() = localDescriptor; 199 baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId(); 200 201 // serialize and send message 202 send( &baseMsg, descriptor ); 165 166 /* send a message to request new link to remote */ 167 logging_debug( "Send messages with request to open link to " << descriptor.toString() ); 168 169 /* 170 * Create Link-Request Message: 171 * NOTE: - Their PeerID (in parent message) 172 * - Our LinkID 173 * - Our PeerID 174 * - Our EndpointDescriptor 175 */ 176 reboost::message_t linkmsg; 177 linkmsg.push_back(linkid.serialize()); 178 linkmsg.push_back(localDescriptor.getPeerId().serialize()); 179 linkmsg.push_back(localDescriptor.endpoints->serialize()); 180 181 // // XXX AKTUELL BUG FINDING... 182 // reboost::shared_buffer_t xxx = localDescriptor.endpoints->serialize(); 183 // EndpointSetPtr xxx_set = endpoint_set::create_EndpointSet(); 184 // xxx_set->deserialize(xxx); 185 // cout << "/// MARIO VORHER: " << localDescriptor.endpoints->to_string() << endl; 186 // cout << "/// MARIO NACHHER: " << xxx_set->to_string() << endl; 187 188 // send message 189 // TODO move enum to BaseComm 190 send_to_peer(AribaBaseMsg::typeLinkRequest, descriptor.getPeerId(), linkmsg, 191 descriptor, system_priority::OVERLAY); 203 192 204 193 return linkid; … … 217 206 218 207 // tell the registered listeners 219 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {208 foreach( CommunicationEvents* i, eventListener ) { 220 209 i->onLinkDown( link, ld.localLocator, ld.remoteLocator ); 221 210 } 222 211 223 // create message to drop the link 212 213 // * send message to drop the link * 224 214 logging_debug( "Sending out link close request. for us, the link is closed now" ); 225 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink ); 226 227 // send message to drop the link 228 send( &msg, ld ); 215 reboost::message_t empty_message; 216 send_over_link( AribaBaseMsg::typeLinkClose, empty_message, ld, system_priority::OVERLAY ); 229 217 230 218 // remove from map … … 232 220 } 233 221 234 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) { 235 236 logging_debug( "Sending out message to link " << lid.toString() ); 237 222 223 seqnum_t BaseCommunication::sendMessage( const LinkID& lid, 224 reboost::message_t message, 225 uint8_t priority, 226 bool bypass_overlay) throw(communication_message_not_sent) 227 { 228 // message type: direct data or (normal) data 229 AribaBaseMsg::type_ type; 230 if ( bypass_overlay ) 231 { 232 type = AribaBaseMsg::typeDirectData; 233 logging_debug( "Sending out direct-message to link " << lid.toString() ); 234 } 235 else 236 { 237 type = AribaBaseMsg::typeData; 238 logging_debug( "Sending out message to link " << lid.toString() ); 239 } 240 241 238 242 // query local link info 239 243 LinkDescriptor& ld = queryLocalLink(lid); 240 if( ld.isUnspecified() ){ 241 logging_error( "Don't know the link with id " << lid.toString() ); 242 return -1; 244 if( ld.isUnspecified() ) 245 { 246 throw communication_message_not_sent("Don't know the link with id " 247 + lid.toString()); 243 248 } 244 249 245 250 // link not up-> error 246 if( !ld.up ) {247 logging_error("Can not send on link " << lid.toString() << ": link not up");248 return -1;249 }250 251 // create message 252 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink ); 253 254 // encapsulate the payload message255 msg.encapsulate( const_cast<Message*>(message) ); 256 257 // send message258 send( &msg, ld);259 260 // return sequence number251 if( !ld.up ) 252 { 253 throw communication_message_not_sent("Can not send on link " 254 + lid.toString() + ": link not up"); 255 } 256 257 258 // * send message * 259 bool okay = send_over_link( type, message, ld, priority ); 260 261 if ( ! okay ) 262 { 263 throw communication_message_not_sent("send_over_link failed!"); 264 } 265 261 266 return ++currentSeqnum; 262 267 } … … 268 273 LinkDescriptor& linkDesc = queryLocalLink(link); 269 274 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); 270 return linkDesc.remote Endpoint;275 return linkDesc.remoteDescriptor; 271 276 } 272 277 } … … 283 288 } 284 289 285 SystemEventType TransportEvent("Transport"); 286 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent ); 287 288 /// called when a system event is emitted by system queue 289 void BaseCommunication::handleSystemEvent(const SystemEvent& event) { 290 291 // dispatch received messages 292 if ( event.getType() == MessageDispatchEvent ){ 293 logging_debug( "Forwarding message receiver" ); 294 boost::function0<void>* handler = event.getData< boost::function0<void> >(); 295 (*handler)(); 296 delete handler; 297 } 298 } 299 300 /** 301 * called within the ASIO thread 302 * when a message is received from underlay transport 303 */ 290 291 292 /*------------------------------ 293 | ASIO thread --> SystemQueue | 294 ------------------------------*/ 295 296 /// ASIO thread 304 297 void BaseCommunication::receive_message(transport_connection::sptr connection, 305 reboost:: message_t msg) {298 reboost::shared_buffer_t msg) { 306 299 307 300 logging_debug( "Dispatching message" ); 308 301 309 boost::function0<void>* handler = new boost::function0<void>(302 SystemQueue::instance().scheduleCall( 310 303 boost::bind( 311 304 &BaseCommunication::receiveMessage, … … 313 306 connection, 314 307 msg) 315 ); 316 317 SystemQueue::instance().scheduleEvent( 318 SystemEvent(this, MessageDispatchEvent, handler) 319 ); 320 } 321 322 /** 323 * called within the ARIBA thread (System Queue) 324 * when a message is received from underlay transport 325 */ 308 ); 309 } 310 311 /// ASIO thread 312 void BaseCommunication::connection_terminated(transport_connection::sptr connection) 313 { 314 SystemQueue::instance().scheduleCall( 315 boost::bind( 316 &BaseCommunication::connectionTerminated, 317 this, 318 connection) 319 ); 320 } 321 322 /*-------------------------------- 323 | [ASIO thread --> SystemQueue] | 324 -------------------------------*/ 325 326 /// ARIBA thread (System Queue) 327 void BaseCommunication::connectionTerminated(transport_connection::sptr connection) 328 { 329 vector<LinkID*> links = connection->get_communication_links(); 330 331 logging_debug("[BaseCommunication] Connection terminated: " 332 << connection->getLocalEndpoint()->to_string() 333 << " <--> " << connection->getRemoteEndpoint()->to_string() 334 << " (" << links.size() << " links)"); 335 336 // remove all links that used the terminated connection 337 for ( vector<LinkID*>::iterator it = links.begin(); it != links.end(); ++it ) 338 { 339 LinkID& link_id = **it; 340 341 logging_debug(" ---> Removing link: " << link_id.toString()); 342 343 // searching for link, not found-> warn 344 LinkDescriptor& linkDesc = queryLocalLink( link_id ); 345 if (linkDesc.isUnspecified()) { 346 logging_warn("Failed to find local link " << link_id.toString()); 347 continue; 348 } 349 350 // inform listeners 351 foreach( CommunicationEvents* i, eventListener ){ 352 i->onLinkFail( linkDesc.localLink, 353 linkDesc.localLocator, linkDesc.remoteLocator ); 354 } 355 356 // remove the link descriptor 357 removeLink( link_id ); 358 } 359 } 360 361 /// ARIBA thread (System Queue) 326 362 void BaseCommunication::receiveMessage(transport_connection::sptr connection, 327 reboost:: message_t message)363 reboost::shared_buffer_t message) 328 364 { 329 330 //// Adapt to old message system //// 331 // Copy data 332 size_t bytes_len = message.size(); 333 uint8_t* bytes = new uint8_t[bytes_len]; 334 message.read(bytes, 0, bytes_len); 335 336 Data data(bytes, bytes_len * 8); 337 338 Message legacy_message; 339 legacy_message.setPayload(data); 340 341 342 343 /// decapsulate message 344 AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>(); 345 logging_debug( "Receiving message of type " << msg->getTypeString() ); 346 365 // XXX 366 logging_debug("/// [receiveMessage] buffersize: " << message.size()); 367 368 // get type 369 uint8_t type = message.data()[0]; 370 reboost::shared_buffer_t sub_buff = message(1); 371 372 // get link id 373 LinkID link_id; 374 if ( type != AribaBaseMsg::typeLinkRequest) 375 { 376 sub_buff = link_id.deserialize(sub_buff); 377 } 378 347 379 // handle message 348 switch ( msg->getType()) {349 380 switch ( type ) 381 { 350 382 // --------------------------------------------------------------------- 351 383 // data message 352 384 // --------------------------------------------------------------------- 353 case AribaBaseMsg::typeData: { 354 logging_debug( "Received data message, forwarding to overlay" ); 355 if( messageReceiver != NULL ) { 385 case AribaBaseMsg::typeData: 386 { 387 logging_debug( "Received data message, forwarding to overlay." ); 388 if( messageReceiver != NULL ) 389 { 356 390 messageReceiver->receiveMessage( 357 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED391 sub_buff, link_id, NodeID::UNSPECIFIED, false 358 392 ); 359 393 } 394 360 395 break; 361 396 } 362 397 398 // --------------------------------------------------------------------- 399 // direct data message (bypass overlay-layer) 400 // --------------------------------------------------------------------- 401 case AribaBaseMsg::typeDirectData: 402 { 403 logging_debug( "Received direct data message, forwarding to application." ); 404 405 if( messageReceiver != NULL ) 406 { 407 messageReceiver->receiveMessage( 408 sub_buff, link_id, NodeID::UNSPECIFIED, true 409 ); 410 } 411 412 break; 413 } 414 415 416 363 417 // --------------------------------------------------------------------- 364 418 // handle link request from remote 365 419 // --------------------------------------------------------------------- 366 case AribaBaseMsg::typeLinkRequest: { 367 logging_debug( "Received link open request" ); 368 369 /// not the correct peer id-> skip request 370 if (!msg->getRemoteDescriptor().getPeerId().isUnspecified() 371 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) { 372 logging_info("Received link request for " 373 << msg->getRemoteDescriptor().getPeerId().toString() 374 << "but i'm " 375 << localDescriptor.getPeerId() 376 << ": Ignoring!"); 377 break; 378 } 379 420 case AribaBaseMsg::typeLinkRequest: 421 { 422 logging_debug( "Received link open request on " 423 << connection->getLocalEndpoint()->to_string() ); 424 425 /* 426 * Deserialize Peer Message 427 * - Our PeerID 428 */ 429 PeerID our_peer_id; 430 sub_buff = our_peer_id.deserialize(sub_buff); 431 432 /// not the correct peer id-> skip request 433 if ( our_peer_id != localDescriptor.getPeerId() && 434 ! our_peer_id.isUnspecified() /* overlay bootstrap */ ) 435 { 436 logging_info("Received link request for " 437 << our_peer_id.toString() 438 << "but i'm " 439 << localDescriptor.getPeerId() 440 << ": Ignoring!"); 441 442 // TODO terminate connection? 443 444 break; 445 } 446 447 448 /* 449 * Deserialize Link-Request Message: 450 * - Their LinkID 451 * - Their PeerID 452 * - Their EndpointDescriptor 453 */ 454 LinkID their_link_id; 455 PeerID their_peer_id; 456 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); 457 sub_buff = their_link_id.deserialize(sub_buff); 458 sub_buff = their_peer_id.deserialize(sub_buff); 459 sub_buff = their_endpoints->deserialize(sub_buff); 460 /* [ Deserialize Link-Request Message ] */ 461 462 380 463 /// only answer the first request 381 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) { 464 if (!queryRemoteLink(their_link_id).isUnspecified()) 465 { 466 467 // TODO aktuell: When will these connections be closed? 468 // ---> Close it now (if it services no links) ? 469 // (see also ! allowlink below) 470 471 // XXX AKTUELL TESTING !! This will cause race conditions. So this is test-code only! 472 if ( connection->get_communication_links().size() == 0 ) 473 { 474 connection->terminate(); 475 } 476 382 477 logging_debug("Link request already received. Ignore!"); 383 478 break; … … 386 481 /// create link ids 387 482 LinkID localLink = LinkID::create(); 388 LinkID remoteLink = msg->getLocalLink();483 LinkID remoteLink = their_link_id; // XXX intermediate variable is unnecessary 389 484 logging_debug( 390 485 "local=" << connection->getLocalEndpoint()->to_string() … … 394 489 // check if link creation is allowed by ALL listeners 395 490 bool allowlink = true; 396 BOOST_FOREACH( CommunicationEvents* i, eventListener ){491 foreach( CommunicationEvents* i, eventListener ){ 397 492 allowlink &= i->onLinkRequest( localLink, 398 493 connection->getLocalEndpoint(), … … 403 498 if( !allowlink ){ 404 499 logging_warn( "Overlay denied creation of link" ); 405 delete msg;406 500 return; 407 501 } … … 411 505 ld->localLink = localLink; 412 506 ld->remoteLink = remoteLink; 413 ld->localLocator = connection->getLocalEndpoint()->clone(); 414 ld->remoteLocator = connection->getRemoteEndpoint()->clone(); 415 ld->connection = connection; 416 ld->remoteEndpoint = msg->getLocalDescriptor(); 417 add_endpoint(ld->remoteLocator); 418 419 // add layer 1-3 addresses 420 ld->remoteEndpoint.getEndpoints().add( 421 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 422 localDescriptor.getEndpoints().add( 423 connection->getLocalEndpoint(), 424 endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 507 ld->localLocator = connection->getLocalEndpoint(); 508 ld->remoteLocator = connection->getRemoteEndpoint(); 509 ld->remoteDescriptor = EndpointDescriptor(their_peer_id, their_endpoints); 510 ld->set_connection(connection); 511 512 513 // update endpoints (should only have any effect in case of NAT) 514 ld->remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); 515 // localDescriptor.endpoints->add_endpoint(connection->getLocalEndpoint()); // XXX 0.0.0.0:0 425 516 426 517 // link is now up-> add it … … 428 519 addLink(ld); 429 520 430 // link is up! 431 logging_debug( "Link (initiated from remote) is up with " 432 << "local(id=" << ld->localLink.toString() << "," 433 << "locator=" << ld->localLocator->to_string() << ") " 434 << "remote(id=" << ld->remoteLink.toString() << ", " 435 << "locator=" << ld->remoteLocator->to_string() << ")" 436 ); 437 438 // sending link request reply 439 logging_debug( "Sending link request reply with ids " 440 << "local=" << localLink.toString() << ", " 441 << "remote=" << remoteLink.toString() ); 442 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink ); 443 reply.getLocalDescriptor() = localDescriptor; 444 reply.getRemoteDescriptor() = ld->remoteEndpoint; 445 446 send( &reply, *ld ); 521 522 523 /* sending link reply */ 524 logging_debug( "Sending link reply with ids " 525 << "local=" << localLink.toString() << ", " 526 << "remote=" << remoteLink.toString() ); 527 528 /* 529 * Create Link-Reply Message: 530 * - Our LinkID 531 * - Our Endpoint_Set (as update) 532 * - Their EndpointDescriptor (maybe they learn something about NAT) 533 */ 534 reboost::message_t linkmsg; 535 linkmsg.push_back(localLink.serialize()); 536 linkmsg.push_back(localDescriptor.endpoints->serialize()); 537 linkmsg.push_back(ld->remoteDescriptor.endpoints->serialize()); 538 539 // XXX 540 cout << "/// MARIO: " << ld->get_connection()->getRemoteEndpoint()->to_string() << endl; 541 542 // send message 543 bool sent = send_over_link( AribaBaseMsg::typeLinkReply, linkmsg, *ld, system_priority::OVERLAY ); 544 545 if ( ! sent ) 546 { 547 logging_error("ERROR: Could not send LinkReply to: " << ld->remoteLocator->to_string()); 548 549 // TODO remove link, close link, ..? 550 551 break; 552 } 553 554 555 // link is up! 556 logging_debug( "Link (initiated from remote) is up with " 557 << "local(id=" << ld->localLink.toString() << "," 558 << "locator=" << ld->localLocator->to_string() << ") " 559 << "remote(id=" << ld->remoteLink.toString() << ", " 560 << "locator=" << ld->remoteLocator->to_string() << ")" 561 ); 447 562 448 563 // inform listeners about new open link 449 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {564 foreach( CommunicationEvents* i, eventListener ) { 450 565 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator); 451 566 } … … 458 573 // handle link request reply 459 574 // --------------------------------------------------------------------- 460 case AribaBaseMsg::typeLinkReply: { 575 case AribaBaseMsg::typeLinkReply: 576 { 461 577 logging_debug( "Received link open reply for a link we initiated" ); 462 578 579 /* 580 * Deserialize Link-Reply Message: 581 * - Their LinkID 582 * - Their Endpoint_Set (as update) 583 * - Our EndpointDescriptor (maybe we can learn something about NAT) 584 */ 585 LinkID their_link_id; 586 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); 587 EndpointSetPtr our_endpoints = endpoint_set::create_EndpointSet(); 588 sub_buff = their_link_id.deserialize(sub_buff); 589 sub_buff = their_endpoints->deserialize(sub_buff); 590 sub_buff = our_endpoints->deserialize(sub_buff); 591 592 463 593 // this is a reply to a link open request, so we have already 464 594 // a link mapping and can now set the remote link to valid 465 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink());595 LinkDescriptor& ld = queryLocalLink( link_id ); 466 596 467 597 // no link found-> warn! 468 598 if (ld.isUnspecified()) { 469 logging_warn("Failed to find local link " << msg->getRemoteLink().toString()); 470 delete msg; 599 logging_warn("Failed to find local link " << link_id.toString()); 471 600 return; 472 601 } 602 603 if ( ld.up ) 604 { 605 logging_warn("Got link replay for already open link. Ignore. LinkID: " << link_id.toString()); 606 607 // TODO send LinkClose ? 608 return; 609 } 473 610 474 611 // store the connection 475 ld. connection = connection;612 ld.set_connection(connection); 476 613 477 614 // set remote locator and link id 478 ld.remoteLink = msg->getLocalLink(); 479 ld.remoteLocator = connection->getRemoteEndpoint()->clone(); 480 ld.remoteEndpoint.getEndpoints().add( 481 msg->getLocalDescriptor().getEndpoints(), 482 endpoint_set::Layer1_4 483 ); 484 485 localDescriptor.getEndpoints().add( 486 msg->getRemoteDescriptor().getEndpoints(), 487 endpoint_set::Layer1_3 488 ); 615 ld.remoteLink = their_link_id; 616 ld.remoteLocator = connection->getRemoteEndpoint(); 617 618 619 /* Update endpoints */ 620 // NOTE: we might loose some information here, but it's our only chance to get rid of outdated information. 621 ld.remoteDescriptor.replace_endpoint_set(their_endpoints); 622 623 // add actual remote endpoint to this set (should only have any effect in case of NAT) 624 ld.remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); 625 626 // TODO In case of NAT, we could learn something about our external IP. 627 // ---> But we must trust the remote peer about this information!! 628 // localDescriptor.endpoints->add_endpoints(our_endpoints); 629 630 631 632 489 633 ld.up = true; 490 add_endpoint(ld.remoteLocator);491 634 492 635 logging_debug( "Link is now up with local id " … … 496 639 497 640 // inform lisneters about link up event 498 BOOST_FOREACH( CommunicationEvents* i, eventListener ){641 foreach( CommunicationEvents* i, eventListener ){ 499 642 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator ); 500 643 } … … 509 652 case AribaBaseMsg::typeLinkClose: { 510 653 // get remote link 511 const LinkID& localLink = msg->getRemoteLink();512 logging_debug( "Received link close request for link " << l ocalLink.toString() );654 // const LinkID& localLink = msg.getRemoteLink(); 655 logging_debug( "Received link close request for link " << link_id.toString() ); 513 656 514 657 // searching for link, not found-> warn 515 LinkDescriptor& linkDesc = queryLocalLink( l ocalLink);658 LinkDescriptor& linkDesc = queryLocalLink( link_id ); 516 659 if (linkDesc.isUnspecified()) { 517 logging_warn("Failed to find local link " << localLink.toString()); 518 delete msg; 660 logging_warn("Failed to find local link " << link_id.toString()); 519 661 return; 520 662 } 521 663 522 664 // inform listeners 523 BOOST_FOREACH( CommunicationEvents* i, eventListener ){665 foreach( CommunicationEvents* i, eventListener ){ 524 666 i->onLinkDown( linkDesc.localLink, 525 667 linkDesc.localLocator, linkDesc.remoteLocator ); … … 527 669 528 670 // remove the link descriptor 529 removeLink( l ocalLink);671 removeLink( link_id ); 530 672 531 673 // done … … 534 676 535 677 // --------------------------------------------------------------------- 536 // handle link locator changes 678 // handle link locator changes -- TODO is this ever called..? 537 679 // --------------------------------------------------------------------- 538 case AribaBaseMsg::typeLinkUpdate: { 539 const LinkID& localLink = msg->getRemoteLink(); 540 logging_debug( "Received link update for link " 541 << localLink.toString() ); 542 543 // find the link description 544 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 545 if (linkDesc.isUnspecified()) { 546 logging_warn("Failed to update local link " 547 << localLink.toString()); 548 delete msg; 549 return; 550 } 551 552 // update the remote locator 553 const address_v* oldremote = linkDesc.remoteLocator; 554 linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone(); 555 556 // inform the listeners (local link has _not_ changed!) 557 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 558 i->onLinkChanged( 559 linkDesc.localLink, // linkid 560 linkDesc.localLocator, // old local 561 linkDesc.localLocator, // new local 562 oldremote, // old remote 563 linkDesc.remoteLocator // new remote 564 ); 565 } 566 567 // done 568 break; 569 } 570 } 571 572 delete msg; 680 // case AribaBaseMsg::typeLinkUpdate: { 681 // const LinkID& localLink = msg.getRemoteLink(); 682 // logging_debug( "Received link update for link " 683 // << localLink.toString() ); 684 // 685 // // find the link description 686 // LinkDescriptor& linkDesc = queryLocalLink( localLink ); 687 // if (linkDesc.isUnspecified()) { 688 // logging_warn("Failed to update local link " 689 // << localLink.toString()); 690 // return; 691 // } 692 // 693 // // update the remote locator 694 // addressing2::EndpointPtr oldremote = linkDesc.remoteLocator; 695 // linkDesc.remoteLocator = connection->getRemoteEndpoint(); 696 // 697 // // TODO update linkDesc.connection ? 698 // 699 // // inform the listeners (local link has _not_ changed!) 700 // foreach( CommunicationEvents* i, eventListener ){ 701 // i->onLinkChanged( 702 // linkDesc.localLink, // linkid 703 // linkDesc.localLocator, // old local 704 // linkDesc.localLocator, // new local 705 // oldremote, // old remote 706 // linkDesc.remoteLocator // new remote 707 // ); 708 // } 709 // 710 // // done 711 // break; 712 // } 713 714 715 default: { 716 logging_warn( "Received unknown message type!" ); 717 break; 718 } 719 720 } 573 721 } 574 722 … … 582 730 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){ 583 731 if( (*i)->localLink != localLink) continue; 584 remove_endpoint((*i)->remoteLocator); 732 // remove_endpoint((*i)->remoteLocator); // XXX 585 733 delete *i; 586 734 linkSet.erase( i ); … … 605 753 } 606 754 607 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {608 LinkIDs ids;609 for (size_t i=0; i<linkSet.size(); i++){610 if( addr == NULL ){611 ids.push_back( linkSet[i]->localLink );612 } else {613 if ( *linkSet[i]->remoteLocator == *addr )614 ids.push_back( linkSet[i]->localLink );615 }616 }617 return ids;618 }755 //LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const { 756 // LinkIDs ids; 757 // for (size_t i=0; i<linkSet.size(); i++){ 758 // if( addr == NULL ){ 759 // ids.push_back( linkSet[i]->localLink ); 760 // } else { 761 // if ( *linkSet[i]->remoteLocator == *addr ) 762 // ids.push_back( linkSet[i]->localLink ); 763 // } 764 // } 765 // return ids; 766 //} 619 767 620 768 void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){ 621 622 #ifdef UNDERLAY_OMNET623 624 // we have no mobility support for simulations625 return626 627 #endif // UNDERLAY_OMNET628 769 629 770 /*- disabled! … … 762 903 } 763 904 764 /// sends a message to all end-points in the end-point descriptor 765 void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) { 766 Data data = data_serialize(legacy_message, DEFAULT_V); 767 768 //// Adapt to new message system //// 769 // transfer data buffer ownership to the shared_buffer 770 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 771 772 reboost::message_t message; 773 message.push_back(buf); 774 775 transport->send(endpoint.getEndpoints(), message); 776 } 777 778 /// sends a message to the remote locator inside the link descriptor 779 void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) { 780 if (desc.remoteLocator==NULL) return; 781 782 Data data = data_serialize(legacy_message, DEFAULT_V); 783 784 //// Adapt to new message system //// 785 // transfer data buffer ownership to the shared_buffer 786 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 787 788 reboost::message_t message; 789 message.push_back(buf); 790 791 desc.connection->send(message); 792 } 905 906 addressing2::EndpointPtr BaseCommunication::get_local_endpoint_of_link( 907 const LinkID& linkid) 908 { 909 LinkDescriptor& ld = queryLocalLink(linkid); 910 911 return ld.get_connection()->getLocalEndpoint(); 912 } 913 914 addressing2::EndpointPtr BaseCommunication::get_remote_endpoint_of_link( 915 const LinkID& linkid) 916 { 917 LinkDescriptor& ld = queryLocalLink(linkid); 918 919 return ld.get_connection()->getRemoteEndpoint(); 920 } 921 922 923 924 bool BaseCommunication::send_over_link( 925 const uint8_t type, 926 reboost::message_t message, 927 const LinkDescriptor& desc, 928 const uint8_t priority) 929 { 930 /* 931 * Create Link Message: 932 * - Type 933 * - Their LinkID 934 */ 935 // link id 936 message.push_front(desc.remoteLink.serialize()); 937 // type 938 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); 939 /* [ Create Link Message ] */ 940 941 942 /* send message */ 943 transport_connection::sptr conn = desc.get_connection(); 944 if ( ! conn ) 945 { 946 cout << "/// MARIO: No connection!!" << endl; // XXX debug 947 return false; 948 } 949 950 // * send over connection * 951 return conn->send(message, priority); 952 } 953 954 void BaseCommunication::send_to_peer( 955 const uint8_t type, 956 const PeerID& peer_id, 957 reboost::message_t message, 958 const EndpointDescriptor& endpoint, 959 const uint8_t priority ) 960 { 961 /* 962 * Create Peer Message: 963 * - Type 964 * - Their PeerID 965 */ 966 // peer id 967 message.push_front(peer_id.serialize()); 968 // type 969 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); 970 971 972 /* send message */ 973 transport->send(endpoint.getEndpoints(), message, priority); 974 } 975 976 977 793 978 794 979 }} // namespace ariba, communication -
source/ariba/communication/BaseCommunication.h
r10653 r12060 50 50 #include <boost/foreach.hpp> 51 51 52 #ifdef ECLIPSE_PARSER 53 #define foreach(a, b) for(a : b) 54 #else 55 #define foreach(a, b) BOOST_FOREACH(a, b) 56 #endif 57 52 58 // utilities 53 59 #include "ariba/utility/types.h" 54 #include "ariba/utility/messages .h"60 #include "ariba/utility/messages/MessageReceiver.h" 55 61 #include "ariba/utility/logging/Logging.h" 56 62 #include "ariba/utility/misc/Demultiplexer.hpp" … … 58 64 59 65 // new transport and addressing 60 #include "ariba/utility/addressing/addressing.hpp" 61 #include "ariba/utility/transport/transport.hpp" 62 #include "ariba/utility/transport/transport_connection.hpp" 66 #include "ariba/utility/transport/transport_peer.hpp" 67 #include "ariba/utility/transport/interfaces/transport_connection.hpp" 68 #include "ariba/utility/transport/interfaces/transport_listener.hpp" 69 #include "ariba/utility/addressing2/endpoint.hpp" 63 70 64 71 // communication … … 72 79 #include "ariba/communication/networkinfo/NetworkInformation.h" 73 80 74 // disabled75 //#ifndef UNDERLAY_OMNET76 // #include "ariba/communication/modules/transport/tcp/TCPTransport.h"77 // #include "ariba/communication/modules/network/ip/IPv4NetworkProtocol.h"78 // using ariba::communication::IPv4NetworkProtocol;79 // using ariba::communication::TCPTransport;80 //#endif81 82 81 namespace ariba { 83 class SideportListener;82 class SideportListener; 84 83 } 85 84 … … 87 86 namespace communication { 88 87 88 89 class communication_message_not_sent: public std::runtime_error 90 { 91 public: 92 /** Takes a character string describing the error. */ 93 explicit communication_message_not_sent(const string& __arg) : 94 std::runtime_error(__arg) 95 { 96 } 97 98 virtual ~communication_message_not_sent() throw() {} 99 }; 100 101 102 89 103 using namespace std; 90 using namespace ariba::addressing;91 104 using namespace ariba::transport; 92 105 using namespace ariba::utility; … … 102 115 * protocols and addressing schemes. 103 116 * 104 * @author Sebastian Mies, Christoph Mayer 117 * @author Sebastian Mies, Christoph Mayer, Mario Hock 105 118 */ 106 119 class BaseCommunication: 107 120 public NetworkChangeInterface, 108 public SystemEventListener,109 121 public transport_listener { 110 122 … … 120 132 121 133 /// Startup the base communication, start modules etc. 122 void start( );134 void start(addressing2::EndpointSetPtr listen_on); 123 135 124 136 /// Stops the base communication, stop modules etc. 125 137 void stop(); 126 127 /// Sets the endpoints128 void setEndpoints( string& endpoints );129 138 130 139 /// Check whether the base communication has been started up … … 147 156 * @return A sequence number for this message 148 157 */ 149 seqnum_t sendMessage(const LinkID lid, const Message* message); 158 seqnum_t sendMessage(const LinkID& lid, 159 reboost::message_t message, 160 uint8_t priority, 161 bool bypass_overlay = false) throw(communication_message_not_sent); 150 162 151 163 /** … … 164 176 * @return List of LinkID 165 177 */ 166 LinkIDs getLocalLinks(const address_v* addr) const; 178 // LinkIDs getLocalLinks(const address_v* addr) const; // XXX aktuell 167 179 168 180 /** … … 187 199 188 200 void unregisterEventListener(CommunicationEvents* _events); 189 190 /// called when a system event is emitted by system queue191 virtual void handleSystemEvent(const SystemEvent& event);192 201 193 202 /** … … 196 205 */ 197 206 virtual void receive_message(transport_connection::sptr connection, 198 reboost::message_t msg); 199 207 reboost::shared_buffer_t msg); 208 209 /** 210 * called within the ASIO thread 211 * when a connection is terminated (e.g. TCP close) 212 */ 213 virtual void connection_terminated(transport_connection::sptr connection); 214 215 addressing2::EndpointPtr get_local_endpoint_of_link(const LinkID& linkid); 216 addressing2::EndpointPtr get_remote_endpoint_of_link(const LinkID& linkid); 217 218 200 219 protected: 201 220 … … 205 224 */ 206 225 void receiveMessage(transport_connection::sptr connection, 207 reboost::message_t msg); 226 reboost::shared_buffer_t message); 227 228 /** 229 * called within the ARIBA thread (System Queue) 230 * when a connection is terminated (e.g. TCP close) 231 */ 232 void connectionTerminated(transport_connection::sptr connection); 233 208 234 209 235 /// called when a network interface change happens … … 221 247 /// default constructor 222 248 LinkDescriptor() : 223 localLink(LinkID::UNSPECIFIED), localLocator(NULL),224 remoteLink(LinkID::UNSPECIFIED), remoteLocator(NULL),249 localLink(LinkID::UNSPECIFIED), 250 remoteLink(LinkID::UNSPECIFIED), 225 251 up(false) { 226 252 } 227 253 228 ~LinkDescriptor() { 229 if (localLocator!=NULL) delete localLocator; 230 if (remoteLocator!=NULL) delete remoteLocator; 254 ~LinkDescriptor() 255 { 256 if ( connection ) 257 { 258 connection->unregister_communication_link(&localLink); 259 } 231 260 } 232 261 … … 240 269 return *unspec; 241 270 } 271 272 273 transport_connection::sptr get_connection() const 274 { 275 return connection; 276 } 277 278 void set_connection(const transport_connection::sptr& conn) 279 { 280 // unregister from old connection, 281 // if any (but normally there shouldn't..) 282 if ( connection ) 283 { 284 connection->unregister_communication_link(&localLink); 285 } 286 287 // * set_connection * 288 connection = conn; 289 290 // register this link with the connection 291 conn->register_communication_link(&localLink); 292 } 242 293 243 294 bool unspecified; … … 245 296 /// link identifiers 246 297 LinkID localLink; 247 const address_v*localLocator;298 addressing2::EndpointPtr localLocator; 248 299 249 300 /// used underlay addresses for the link 250 301 LinkID remoteLink; 251 const address_v*remoteLocator;302 addressing2::EndpointPtr remoteLocator; 252 303 253 304 /// the remote end-point descriptor 254 EndpointDescriptor remote Endpoint;305 EndpointDescriptor remoteDescriptor; 255 306 256 307 /// flag, whether this link is up 257 308 bool up; 258 309 310 311 private: 259 312 /// connection if link is up 260 313 transport_connection::sptr connection; … … 281 334 /// The local end-point descriptor 282 335 EndpointDescriptor localDescriptor; 283 284 #ifndef UNDERLAY_OMNET 336 337 /** 338 * endpoint_set holding the addresses of the "server"-sockets, 339 * ---> that should be opened 340 * 341 * (e.g. 0.0.0.0:41322) 342 */ 343 addressing2::EndpointSetPtr listenOn_endpoints; 344 345 /** 346 * endpoint_set holding the addresses of the "server"-sockets, 347 * ---> that are actually open 348 * 349 * (e.g. 0.0.0.0:41322) 350 * 351 * XXX should only be in transport_peer 352 */ 353 addressing2::EndpointSetPtr active_listenOn_endpoints; 354 355 /** 356 * endpoint_set holding the addresses of the "server"-sockets, 357 * ---> here the discovered "addressable" addresses are stored 358 * 359 * (e.g. 192.168.0.5:41322) 360 * 361 * XXX should only be in localDescriptor 362 */ 363 addressing2::EndpointSetPtr local_endpoints; 364 285 365 /// network change detector 286 366 NetworkChangeDetection networkMonitor; 287 #endif288 367 289 368 /// list of all remote addresses of links to end-points 290 class endpoint_reference { 291 public: 292 int count; ///< the number of open links to this end-point 293 const address_v* endpoint; ///< the end-point itself 294 }; 295 vector<endpoint_reference> remote_endpoints; 296 297 /// adds an end-point to the list 298 void add_endpoint( const address_v* endpoint ); 299 300 /// removes an end-point from the list 301 void remove_endpoint( const address_v* endpoint ); 369 // XXX DEPRECATED 370 // class endpoint_reference { 371 // public: 372 // int count; ///< the number of open links to this end-point 373 // const address_v* endpoint; ///< the end-point itself 374 // }; 375 // vector<endpoint_reference> remote_endpoints; 376 377 // XXX DEPRECATED 378 // /// adds an end-point to the list 379 // void add_endpoint( const address_v* endpoint ); 380 // 381 // /// removes an end-point from the list 382 // void remove_endpoint( const address_v* endpoint ); 302 383 303 384 /// event listener … … 314 395 MessageReceiver* messageReceiver; 315 396 316 /// convenience: send message to peer 317 void send( Message* message, const EndpointDescriptor& endpoint ); 318 void send( Message* message, const LinkDescriptor& descriptor ); 397 398 /* 399 * Sends a message over an existing link. 400 * ---> Adds »Link Message« Header 401 */ 402 bool send_over_link( 403 const uint8_t type, 404 reboost::message_t message, 405 const LinkDescriptor& desc, 406 const uint8_t priority); 407 408 /* 409 * Sends a message to a known peer. (To all known endpoints.) 410 * ---> Adds »Peer Message« Header 411 */ 412 void send_to_peer( 413 const uint8_t type, 414 const PeerID& peer_id, 415 reboost::message_t message, 416 const EndpointDescriptor& endpoint, 417 const uint8_t priority ); 418 319 419 320 420 /// state of the base communication -
source/ariba/communication/CommunicationEvents.cpp
r5284 r12060 49 49 50 50 bool CommunicationEvents::onLinkRequest(const LinkID& id, 51 const address_v* local, const address_v* remote) { 51 const addressing2::EndpointPtr local, 52 const addressing2::EndpointPtr remote) 53 { 52 54 return true; 53 55 } 54 56 55 void CommunicationEvents::onLinkUp(const LinkID& id, const address_v* local, 56 const address_v* remote) { 57 void CommunicationEvents::onLinkUp(const LinkID& id, 58 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 59 { 57 60 } 58 61 59 void CommunicationEvents::onLinkDown(const LinkID& id, const address_v* local, 60 const address_v* remote) { 62 void CommunicationEvents::onLinkDown(const LinkID& id, 63 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 64 { 61 65 } 62 66 63 67 void CommunicationEvents::onLinkChanged(const LinkID& id, 64 const address_v* oldlocal, const address_v* newlocal, 65 const address_v* oldremote, const address_v* newremote) { 68 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 69 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote) 70 { 66 71 } 67 72 68 void CommunicationEvents::onLinkFail(const LinkID& id, const address_v* local, 69 const address_v* remote) { 73 void CommunicationEvents::onLinkFail(const LinkID& id, 74 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 75 { 70 76 } 71 77 72 78 void CommunicationEvents::onLinkQoSChanged(const LinkID& id, 73 const address_v* local, const address_v* remote, const QoSParameterSet& qos) { 79 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 80 const QoSParameterSet& qos) 81 { 74 82 } 75 83 -
source/ariba/communication/CommunicationEvents.h
r5284 r12060 42 42 #include "ariba/utility/types/LinkID.h" 43 43 #include "ariba/utility/types/QoSParameterSet.h" 44 #include "ariba/utility/addressing /addressing.hpp"44 #include "ariba/utility/addressing2/endpoint.hpp" 45 45 46 46 namespace ariba { … … 49 49 using ariba::utility::LinkID; 50 50 using ariba::utility::QoSParameterSet; 51 using namespace ariba::addressing;52 51 53 52 class CommunicationEvents { … … 68 67 * @return True, if the link should be established 69 68 */ 70 virtual bool onLinkRequest(const LinkID& id, const address_v* local, 71 const address_v* remote); 69 virtual bool onLinkRequest(const LinkID& id, 70 const addressing2::EndpointPtr local, 71 const addressing2::EndpointPtr remote); 72 72 73 73 /** … … 77 77 * @param id The link id of the established link 78 78 */ 79 virtual void onLinkUp(const LinkID& id, const address_v* local,80 const address_v*remote);79 virtual void onLinkUp(const LinkID& id, 80 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 81 81 82 82 /** … … 85 85 * @param id The link identifier of the dropped link 86 86 */ 87 virtual void onLinkDown(const LinkID& id, const address_v* local,88 const address_v*remote);87 virtual void onLinkDown(const LinkID& id, 88 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 89 89 90 90 /** … … 97 97 */ 98 98 virtual void onLinkChanged(const LinkID& id, 99 const address_v* oldlocal, const address_v* newlocal, 100 const address_v* oldremote, const address_v* newremote 101 ); 99 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 100 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote); 102 101 103 virtual void onLinkFail(const LinkID& id, const address_v* local,104 const address_v*remote);102 virtual void onLinkFail(const LinkID& id, 103 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 105 104 106 virtual void onLinkQoSChanged(const LinkID& id, const address_v* local, 107 const address_v* remote, const QoSParameterSet& qos); 105 virtual void onLinkQoSChanged(const LinkID& id, 106 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 107 const QoSParameterSet& qos); 108 108 }; 109 109 -
source/ariba/communication/EndpointDescriptor.cpp
r5624 r12060 42 42 namespace communication { 43 43 44 vsznDefault(EndpointDescriptor);44 //vsznDefault(EndpointDescriptor); 45 45 46 EndpointDescriptor::EndpointDescriptor(){ 46 EndpointDescriptor::EndpointDescriptor() : 47 endpoints(new addressing2::endpoint_set()) 48 { 47 49 } 48 50 … … 55 57 } 56 58 57 EndpointDescriptor::EndpointDescriptor(const endpoint_set& endpoints ) : 58 endpoints(endpoints){ 59 EndpointDescriptor::EndpointDescriptor( 60 const PeerID& peer_id, 61 addressing2::EndpointSetPtr endpoints ) : 62 peerId(peer_id), 63 endpoints(endpoints) 64 { 59 65 } 60 66 61 EndpointDescriptor::EndpointDescriptor(const string& str) : endpoints(str){ 67 EndpointDescriptor::EndpointDescriptor(const string& str) : 68 endpoints(new addressing2::endpoint_set()) 69 { 70 cout << "ERROR: Construction of EndpointDescriptor from String is not functional!!" << endl; 71 assert( false ); 62 72 } 63 73 74 75 reboost::message_t EndpointDescriptor::serialize() const 76 { 77 reboost::message_t msg; 78 msg.push_back(peerId.serialize()); 79 msg.push_back(endpoints->serialize()); 80 81 return msg; 82 } 83 84 reboost::shared_buffer_t EndpointDescriptor::deserialize(reboost::shared_buffer_t buff) 85 { 86 buff = peerId.deserialize(buff); 87 buff = endpoints->deserialize(buff); 88 89 return buff; 90 } 91 92 64 93 }} // namespace ariba, communication -
source/ariba/communication/EndpointDescriptor.h
r7744 r12060 42 42 #include <string> 43 43 #include <set> 44 #include "ariba/utility/serialization.h"44 //#include "ariba/utility/serialization.h" 45 45 #include "ariba/utility/types/PeerID.h" 46 #include "ariba/utility/addressing/endpoint_set.hpp" 46 47 #include "ariba/utility/addressing2/endpoint_set.hpp" 48 49 // reboost messages 50 #include "ariba/utility/transport/messages/message.hpp" 51 47 52 48 53 namespace ariba { … … 51 56 using_serialization; 52 57 using namespace std; 53 using namespace ariba::addressing;54 58 using ariba::utility::PeerID; 55 59 56 class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE 57 friend class BaseCommunication; 60 61 /** 62 * This class is used a transitions helper between the old addressing and 63 * serialization to the new addressing2 and the new message classes 64 * 65 * Maybe it will be replaced, or at least modified in the future. 66 */ 67 //class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE 68 // friend class BaseCommunication; 69 class EndpointDescriptor 70 { 71 friend class BaseCommunication; 58 72 59 73 public: … … 68 82 69 83 /// construct end-points from an endpoint set 70 EndpointDescriptor(const endpoint_set&endpoints );84 EndpointDescriptor(const PeerID& peer_id, addressing2::EndpointSetPtr endpoints ); 71 85 86 // FIXME NOT WORKING !! 72 87 /// construct end-points from a string 73 88 EndpointDescriptor(const string& str); … … 75 90 /// convert end-points to string 76 91 string toString() const { 77 return endpoints .to_string();92 return endpoints->to_string(); 78 93 } 79 94 … … 90 105 } 91 106 92 /// create endpoint93 static EndpointDescriptor* fromString(string str) {94 return new EndpointDescriptor(str);95 }107 // /// create endpoint 108 // static EndpointDescriptor* fromString(string str) { 109 // return new EndpointDescriptor(str); 110 // } 96 111 97 112 bool operator==(const EndpointDescriptor& rh) const { … … 113 128 114 129 /// returns the end-points of this descriptor 115 endpoint_set& getEndpoints(){130 addressing2::const_EndpointSetPtr getEndpoints() const { 116 131 return endpoints; 117 132 } 118 119 /// returns the end-points of this descriptor120 const endpoint_set& getEndpoints() const{121 returnendpoints;133 134 void replace_endpoint_set(addressing2::EndpointSetPtr new_endpoints) 135 { 136 endpoints = new_endpoints; 122 137 } 123 138 124 139 /// returns a reference to the peer id 125 140 PeerID& getPeerId() { … … 132 147 return peerId; 133 148 } 149 150 /// returns a message with peerId and endpoints in it 151 reboost::message_t serialize() const; 152 153 /// deserialite peerId and endpoints 154 reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff); 155 134 156 private: 135 endpoint_setendpoints;157 addressing2::EndpointSetPtr endpoints; 136 158 PeerID peerId; 137 159 }; … … 139 161 }} // namespace ariba, communication 140 162 141 sznBeginDefault( ariba::communication::EndpointDescriptor, X ){ 142 143 // serialize peer id 144 X && &peerId; 145 146 // serialize end-points 147 uint16_t len = endpoints.to_bytes_size(); 148 X && len; 149 uint8_t* buffer = X.bytes( len ); 150 if (buffer!=NULL) { 151 if (X.isDeserializer()) endpoints.assign(buffer,len); 152 else endpoints.to_bytes(buffer); 153 } 154 }sznEnd(); 163 //sznBeginDefault( ariba::communication::EndpointDescriptor, X ){ 164 // 165 // // TODO 166 // assert(false); 167 // 168 // // serialize peer id 169 // X && &peerId; 170 // 171 // // serialize end-points 172 // uint16_t len = endpoints.to_bytes_size(); 173 // X && len; 174 // uint8_t* buffer = X.bytes( len ); 175 // if (buffer!=NULL) { 176 // if (X.isDeserializer()) endpoints.assign(buffer,len); 177 // else endpoints.to_bytes(buffer); 178 // } 179 //}sznEnd(); 155 180 156 181 #endif /*ENDPOINTDESCRIPTOR_H_*/ -
source/ariba/communication/messages/AribaBaseMsg.h
r9694 r12060 42 42 #include <string> 43 43 #include <boost/cstdint.hpp> 44 #include "ariba/utility/messages.h" 44 //#include "ariba/utility/messages.h" 45 #include "ariba/utility/messages/Message.h" 45 46 #include "ariba/utility/serialization.h" 46 47 #include "ariba/utility/types/LinkID.h" … … 61 62 using_serialization; 62 63 64 // XXX This whole message is DEPRECATED 63 65 class AribaBaseMsg : public Message { 64 66 VSERIALIZEABLE; … … 69 71 typeLinkReply = 2, 70 72 typeLinkClose = 3, 71 typeLinkUpdate = 4 73 typeLinkUpdate = 4, 74 typeDirectData = 5 72 75 }; 73 76 … … 115 118 116 119 sznBeginDefault( ariba::communication::AribaBaseMsg, X ) { 117 X && type && & localLink && &remoteLink;120 X && type && &remoteLink; 118 121 if (type == typeLinkReply || type == typeLinkRequest) 119 X && localDescriptor && remoteDescriptor;120 X && Payload();122 X && &localLink && localDescriptor && remoteDescriptor; 123 // X && Payload(); 121 124 } sznEnd(); 122 125 -
source/ariba/communication/messages/CMakeLists.txt
r10700 r12060 37 37 # [License] 38 38 39 add_headers(AribaBaseMsg.h)39 #add_headers(AribaBaseMsg.h) 40 40 41 add_sources(AribaBaseMsg.cpp)41 #add_sources(AribaBaseMsg.cpp) -
source/ariba/communication/networkinfo/AddressDiscovery.cpp
r10700 r12060 49 49 #include <ifaddrs.h> 50 50 51 #include <string> 52 #include <boost/asio/ip/address.hpp> 53 #include <boost/foreach.hpp> 54 55 #include "ariba/utility/addressing2/tcpip_endpoint.hpp" 56 #include "ariba/utility/addressing/mac_address.hpp" 57 51 58 #ifdef HAVE_LIBBLUETOOTH 52 59 #include <bluetooth/bluetooth.h> … … 58 65 namespace communication { 59 66 60 mac_address AddressDiscovery::getMacFromIF( const char* name ) { 67 68 using namespace std; 69 using namespace addressing2; 70 using namespace boost::asio::ip; 71 72 using ariba::addressing::mac_address; 73 74 mac_address getMacFromIF( const char* name ) 75 { 61 76 mac_address addr; 62 77 #ifdef HAVE_LIBBLUETOOTH … … 73 88 } 74 89 75 int AddressDiscovery::dev_info(int s, int dev_id, long arg) { 76 #ifdef HAVE_LIBBLUETOOTH 77 endpoint_set* set = (endpoint_set*)arg; 78 struct hci_dev_info di; 79 memset(&di, 0, sizeof(struct hci_dev_info)); 80 di.dev_id = dev_id; 81 if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0; 82 mac_address mac; 83 mac.bluetooth( di.bdaddr ); 84 address_vf vf = mac; 85 set->add(vf); 90 int dev_info(int s, int dev_id, long arg) 91 { 92 #ifdef HAVE_LIBBLUETOOTH 93 // endpoint_set* set = (endpoint_set*)arg; 94 // struct hci_dev_info di; 95 // memset(&di, 0, sizeof(struct hci_dev_info)); 96 // di.dev_id = dev_id; 97 // if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0; 98 // mac_address mac; 99 // mac.bluetooth( di.bdaddr ); 100 // address_vf vf = mac; 101 // set->add(vf); 86 102 #endif 87 103 return 0; 88 104 } 89 105 90 void AddressDiscovery::discover_bluetooth( endpoint_set& endpoints ) { 91 #ifdef HAVE_LIBBLUETOOTH 92 hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints ); 93 #endif 94 } 95 96 void AddressDiscovery::discover_ip_addresses( endpoint_set& endpoints ) { 97 struct ifaddrs* ifaceBuffer = NULL; 98 void* tmpAddrPtr = NULL; 99 100 int ret = getifaddrs( &ifaceBuffer ); 101 if( ret != 0 ) return; 102 103 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) { 104 105 // ignore devices that are disabled or have no ip 106 if(i == NULL) continue; 107 struct sockaddr* addr = i->ifa_addr; 108 if (addr==NULL) continue; 109 110 // ignore tun devices 111 string device = string(i->ifa_name); 112 if(device.find_first_of("tun") == 0) continue; 113 114 if (addr->sa_family == AF_INET) { 115 // look for ipv4 116 char straddr[INET_ADDRSTRLEN]; 117 tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr; 118 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 119 ip_address ip = straddr; 120 if (ip.is_loopback()) continue; 121 address_vf vf = ip; 122 endpoints.add( vf ); 123 } else 124 if (addr->sa_family == AF_INET6) { 125 // look for ipv6 126 char straddr[INET6_ADDRSTRLEN]; 127 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr; 128 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 129 ip_address ip = straddr; 130 if (ip.is_loopback()) continue; 131 // if (ip.is_link_local()) continue; 132 address_vf vf = ip; 133 endpoints.add( vf ); 134 } 135 } 136 137 freeifaddrs(ifaceBuffer); 138 } 139 140 void AddressDiscovery::discover_endpoints( endpoint_set& endpoints ) { 141 discover_ip_addresses( endpoints ); 142 discover_bluetooth( endpoints ); 106 void discover_bluetooth( 107 EndpointSetPtr listenOn_endpoints, 108 EndpointSetPtr discovered_endpoints ) 109 { 110 #ifdef HAVE_LIBBLUETOOTH 111 // FIXME aktuell bluetooth 112 // hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints ); 113 #endif 114 } 115 116 void discover_ip_addresses( 117 EndpointSetPtr listenOn_endpoints, 118 EndpointSetPtr discovered_endpoints ) 119 { 120 bool discover_ipv4 = false; 121 bool discover_ipv6 = false; 122 vector<uint16_t> ipv4_ports; 123 vector<uint16_t> ipv6_ports; 124 125 /* analyze listenOn_endpoints */ 126 BOOST_FOREACH( TcpIP_EndpointPtr endp, listenOn_endpoints->get_tcpip_endpoints() ) 127 { 128 // BRANCH: IPv4 any [0.0.0.0] 129 if ( endp->to_asio().address() == address_v4::any() ) 130 { 131 // add port 132 ipv4_ports.push_back(endp->to_asio().port()); 133 134 discover_ipv4 = true; 135 } 136 137 // BRANCH: IPv6 any [::] 138 else if ( endp->to_asio().address() == address_v6::any() ) 139 { 140 // add port 141 ipv6_ports.push_back(endp->to_asio().port()); 142 143 discover_ipv6 = true; 144 145 146 // NOTE: on linux the ipv6-any address [::] catches ipv4 as well 147 ipv4_ports.push_back(endp->to_asio().port()); 148 discover_ipv4 = true; 149 } 150 151 // BRANCH: explicit ip address 152 else 153 { 154 // ---> don't discover anything, just add it directly 155 discovered_endpoints->add_endpoint(endp); 156 } 157 } 158 159 160 /* discover addresses */ 161 if ( discover_ipv4 || discover_ipv6 ) 162 { 163 struct ifaddrs* ifaceBuffer = NULL; 164 void* tmpAddrPtr = NULL; 165 166 int ret = getifaddrs( &ifaceBuffer ); 167 if( ret != 0 ) return; 168 169 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) 170 { 171 // ignore devices that are disabled or have no ip 172 if(i == NULL) continue; 173 struct sockaddr* addr = i->ifa_addr; 174 if (addr==NULL) continue; 175 176 // // ignore tun devices // XXX why? 177 // string device = string(i->ifa_name); 178 // if(device.find_first_of("tun") == 0) continue; 179 180 // IPv4 181 if ( discover_ipv4 && addr->sa_family == AF_INET ) 182 { 183 char straddr[INET_ADDRSTRLEN]; 184 tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr; 185 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 186 187 address ip_addr = address::from_string(straddr); 188 189 // skip loopback address 190 if ( ip_addr.to_v4() == address_v4::loopback() ) 191 continue; 192 193 // add endpoint for this address and every given ipv4 port 194 BOOST_FOREACH( uint16_t port, ipv4_ports ) 195 { 196 tcp::endpoint tcpip_endp(ip_addr, port); 197 TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp)); 198 199 discovered_endpoints->add_endpoint(endp); 200 } 201 } 202 203 // IPv6 204 else if ( discover_ipv6 && addr->sa_family == AF_INET6 ) 205 { 206 // look for ipv6 207 char straddr[INET6_ADDRSTRLEN]; 208 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr; 209 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 210 211 address ip_addr = address::from_string(straddr); 212 213 // skip loopback address 214 if ( ip_addr.to_v6() == address_v6::loopback() ) 215 continue; 216 217 // add endpoint for this address and every given ipv4 port 218 BOOST_FOREACH( uint16_t port, ipv6_ports ) 219 { 220 tcp::endpoint tcpip_endp(ip_addr, port); 221 TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp)); 222 223 discovered_endpoints->add_endpoint(endp); 224 } 225 } 226 } 227 228 freeifaddrs(ifaceBuffer); 229 } 230 } 231 232 233 234 EndpointSetPtr AddressDiscovery::discover_endpoints(EndpointSetPtr listenOn_endpoints) 235 { 236 EndpointSetPtr discovered_endpoints(new addressing2::endpoint_set()); 237 238 discover_ip_addresses( listenOn_endpoints, discovered_endpoints ); 239 discover_bluetooth( listenOn_endpoints, discovered_endpoints ); 240 241 return discovered_endpoints; 143 242 } 144 243 -
source/ariba/communication/networkinfo/AddressDiscovery.h
r5639 r12060 40 40 #define __ADDRESS_DISCOVERY_H 41 41 42 #include "ariba/utility/addressing/addressing.hpp" 43 44 using namespace ariba::addressing; 42 #include "ariba/utility/addressing2/endpoint_set.hpp" 45 43 46 44 namespace ariba { 47 45 namespace communication { 48 46 47 using addressing2::EndpointSetPtr; 48 49 49 class AddressDiscovery { 50 50 public: 51 static void discover_endpoints( endpoint_set& endpoints);51 static EndpointSetPtr discover_endpoints(EndpointSetPtr listenOn_endpoints); 52 52 53 53 private: 54 static mac_address getMacFromIF( const char* name ); 55 static int dev_info(int s, int dev_id, long arg); 56 static void discover_bluetooth( endpoint_set& endpoints ); 57 static void discover_ip_addresses( endpoint_set& endpoints ); 54 // TODO aktuell weg damit.. 55 // static mac_address getMacFromIF( const char* name ); 56 // static int dev_info(int s, int dev_id, long arg); 57 // static void discover_bluetooth( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints ); 58 // static void discover_ip_addresses( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints ); 58 59 }; 59 60 -
source/ariba/overlay/BaseOverlay.cpp
r10653 r12060 57 57 #include "ariba/utility/visual/DddVis.h" 58 58 #include "ariba/utility/visual/ServerVis.h" 59 #include <ariba/utility/misc/sha1.h> 59 60 60 61 namespace ariba { 61 62 namespace overlay { 63 64 using namespace std; 65 using ariba::transport::system_priority; 62 66 63 67 #define visualInstance ariba::utility::DddVis::instance() 64 68 #define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY 65 69 #define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION 70 71 72 // time constants (in seconds) 73 #define KEEP_ALIVE_TIME 60 // send keep-alive message after link is not used for #s 74 75 #define LINK_ESTABLISH_TIME_OUT 10 // timeout: link requested but not up 76 #define KEEP_ALIVE_TIME_OUT KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT // timeout: no data received on this link (incl. keep-alive messages) 77 #define AUTO_LINK_TIME_OUT KEEP_ALIVE_TIME_OUT // timeout: auto link not used for #s 66 78 67 79 … … 85 97 86 98 LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) { 87 BOOST_FOREACH( LinkDescriptor* lp, links )99 foreach( LinkDescriptor* lp, links ) 88 100 if ((communication ? lp->communicationId : lp->overlayId) == link) 89 101 return lp; … … 92 104 93 105 const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const { 94 BOOST_FOREACH( const LinkDescriptor* lp, links )106 foreach( const LinkDescriptor* lp, links ) 95 107 if ((communication ? lp->communicationId : lp->overlayId) == link) 96 108 return lp; … … 122 134 123 135 /// returns a auto-link descriptor 124 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) { 136 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) 137 { 125 138 // search for a descriptor that is already up 126 BOOST_FOREACH( LinkDescriptor* lp, links ) 127 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0) 128 return lp; 139 foreach( LinkDescriptor* lp, links ) 140 { 141 if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) ) 142 return lp; 143 } 144 129 145 // if not found, search for one that is about to come up... 130 BOOST_FOREACH( LinkDescriptor* lp, links ) 131 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 ) 132 return lp; 146 foreach( LinkDescriptor* lp, links ) 147 { 148 time_t now = time(NULL); 149 150 if (lp->autolink && lp->remoteNode == node && lp->service == service 151 && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT ) 152 return lp; 153 } 154 133 155 return NULL; 134 156 } … … 136 158 /// stabilizes link information 137 159 void BaseOverlay::stabilizeLinks() { 138 // send keep-alive messages over established links 139 BOOST_FOREACH( LinkDescriptor* ld, links ) { 160 time_t now = time(NULL); 161 162 // send keep-alive messages over established links 163 foreach( LinkDescriptor* ld, links ) 164 { 140 165 if (!ld->up) continue; 141 OverlayMsg msg( OverlayMsg::typeLinkAlive, 142 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 143 if (ld->relayed) msg.setRouteRecord(true); 144 send_link( &msg, ld->overlayId ); 166 167 if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME ) 168 { 169 logging_debug("[BaseOverlay] Sending KeepAlive over " 170 << ld->to_string() 171 << " after " 172 << difftime( now, ld->keepAliveSent ) 173 << "s"); 174 175 OverlayMsg msg( OverlayMsg::typeKeepAlive, 176 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 177 msg.setRouteRecord(true); 178 ld->keepAliveSent = now; 179 send_link( &msg, ld->overlayId, system_priority::OVERLAY ); 180 } 145 181 } 146 182 147 183 // iterate over all links and check for time boundaries 148 184 vector<LinkDescriptor*> oldlinks; 149 time_t now = time(NULL); 150 BOOST_FOREACH( LinkDescriptor* ld, links ) { 151 152 // keep alives and not up? yes-> link connection request is stale! 153 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) { 154 155 // increase counter 156 ld->keepAliveMissed++; 157 158 // missed more than four keep-alive messages (10 sec)? -> drop link 159 if (ld->keepAliveMissed > 4) { 160 logging_info( "Link connection request is stale, closing: " << ld ); 161 oldlinks.push_back( ld ); 162 continue; 163 } 185 foreach( LinkDescriptor* ld, links ) { 186 187 // link connection request stale? 188 if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT ) // NOTE: keepAliveReceived == now, on connection request 189 { 190 logging_info( "Link connection request is stale, closing: " << ld ); 191 ld->failed = true; 192 oldlinks.push_back( ld ); 193 continue; 164 194 } 165 195 166 196 if (!ld->up) continue; 167 197 198 199 200 168 201 // check if link is relayed and retry connecting directly 202 // TODO Mario: What happens here? --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply 169 203 if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) { 170 204 ld->retryCounter--; … … 173 207 174 208 // remote used as relay flag 175 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)209 if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT) // TODO is this a reasonable timeout ?? 176 210 ld->relaying = false; 177 211 … … 183 217 184 218 // auto-link time exceeded? 185 if ( ld->autolink && difftime( now, ld->lastuse ) > 30) {219 if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) { 186 220 oldlinks.push_back( ld ); 187 221 continue; … … 189 223 190 224 // keep alives missed? yes-> 191 if ( difftime( now, ld->keepAliveTime ) > 4 ) { 192 193 // increase counter 194 ld->keepAliveMissed++; 195 196 // missed more than four keep-alive messages (4 sec)? -> drop link 197 if (ld->keepAliveMissed >= 2) { 198 logging_info( "Link is stale, closing: " << ld ); 199 oldlinks.push_back( ld ); 200 continue; 201 } 225 if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT ) 226 { 227 logging_info( "Link is stale, closing: " << ld ); 228 ld->failed = true; 229 oldlinks.push_back( ld ); 230 continue; 202 231 } 203 232 } 204 233 205 234 // drop links 206 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {235 foreach( LinkDescriptor* ld, oldlinks ) { 207 236 logging_info( "Link timed out. Dropping " << ld ); 208 237 ld->relaying = false; … … 210 239 } 211 240 212 // show link state 213 counter++; 214 if (counter>=4) showLinks(); 215 if (counter>=4 || counter<0) counter = 0; 241 242 243 244 // show link state (debug output) 245 if (counter>=10 || counter<0) 246 { 247 showLinks(); 248 counter = 0; 249 } 250 else 251 { 252 counter++; 253 } 216 254 } 217 255 … … 230 268 231 269 int i=0; 232 BOOST_FOREACH( LinkDescriptor* ld, links ) {233 if (! ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;270 foreach( LinkDescriptor* ld, links ) { 271 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue; 234 272 bool found = false; 235 BOOST_FOREACH(NodeID& id, nodes)273 foreach(NodeID& id, nodes) 236 274 if (id == ld->remoteNode) found = true; 237 275 if (found) continue; … … 261 299 int i=0; 262 300 logging_info("--- link state -------------------------------"); 263 BOOST_FOREACH( LinkDescriptor* ld, links ) {301 foreach( LinkDescriptor* ld, links ) { 264 302 string epd = ""; 265 if (ld->isDirectVital()) 266 epd = getEndpointDescriptor(ld->remoteNode).toString(); 303 if (isLinkDirectVital(ld)) 304 { 305 // epd = getEndpointDescriptor(ld->remoteNode).toString(); 306 307 epd = "Connection: "; 308 epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string(); 309 epd += " <---> "; 310 epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string(); 311 } 267 312 268 313 logging_info("LINK_STATE: " << i << ": " << ld << " " << epd); … … 289 334 // internal message delivery --------------------------------------------------- 290 335 336 337 seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority ) 338 { 339 // set priority 340 message->setPriority(priority); 341 342 // wrap old OverlayMsg into reboost message 343 reboost::message_t wrapped_message = message->wrap_up_for_sending(); 344 345 // send down to BaseCommunication 346 try 347 { 348 // * send * 349 return bc->sendMessage(bc_link, wrapped_message, priority, false); 350 } 351 catch ( communication::communication_message_not_sent& e ) 352 { 353 ostringstream out; 354 out << "Communication message not sent: " << e.what(); 355 throw message_not_sent(out.str()); 356 } 357 358 throw logic_error("This should never happen!"); 359 } 360 361 291 362 /// routes a message to its destination node 292 void BaseOverlay::route( OverlayMsg* message ) {293 363 void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop ) 364 { 294 365 // exceeded time-to-live? yes-> drop message 295 if (message->getNumHops() > message->getTimeToLive()) { 296 logging_warn("Message exceeded TTL. Dropping message and relay routes" 297 "for recovery."); 366 if (message->getNumHops() > message->getTimeToLive()) 367 { 368 logging_warn("Message exceeded TTL. Dropping message and relay routes " 369 << "for recovery. Hop count: " << (int) message->getNumHops()); 298 370 removeRelayNode(message->getDestinationNode()); 299 371 return; … … 301 373 302 374 // no-> forward message 303 else { 375 else 376 { 304 377 // destinastion myself? yes-> handle message 305 if (message->getDestinationNode() == nodeId) { 306 logging_warn("Usually I should not route messages to myself!"); 307 Message msg; 308 msg.encapsulate(message); 309 handleMessage( &msg, NULL ); 310 } else { 311 // no->send message to next hop 312 send( message, message->getDestinationNode() ); 313 } 314 } 378 if (message->getDestinationNode() == nodeId) 379 { 380 logging_warn("Usually I should not route messages to myself. And I won't!"); 381 } 382 383 // no->send message to next hop 384 else 385 { 386 try 387 { 388 /* (deep) packet inspection to determine priority */ 389 // BRANCH: typeData --> send with low priority 390 if ( message->getType() == OverlayMsg::typeData ) 391 { 392 // TODO think about implementing explicit routing queue (with active queue management??) 393 send( message, 394 message->getDestinationNode(), 395 message->getPriority(), 396 last_hop ); 397 } 398 // BRANCH: internal message --> send with higher priority 399 else 400 { 401 send( message, 402 message->getDestinationNode(), 403 system_priority::HIGH, 404 last_hop ); 405 } 406 } 407 catch ( message_not_sent& e ) 408 { 409 logging_warn("Unable to route message of type " 410 << message->getType() 411 << " to " 412 << message->getDestinationNode() 413 << ". Reason: " 414 << e.what()); 415 416 // inform sender 417 if ( message->getType() != OverlayMsg::typeMessageLost ) 418 { 419 report_lost_message(message); 420 } 421 } 422 } 423 } 424 } 425 426 void BaseOverlay::report_lost_message( const OverlayMsg* message ) 427 { 428 OverlayMsg reply(OverlayMsg::typeMessageLost); 429 reply.setSeqNum(message->getSeqNum()); 430 431 /** 432 * MessageLost-Message 433 * 434 * - Type of lost message 435 * - Hop count of lost message 436 * - Source-LinkID of lost message 437 */ 438 reboost::shared_buffer_t b(sizeof(uint8_t)*2); 439 b.mutable_data()[0] = message->getType(); 440 b.mutable_data()[1] = message->getNumHops(); 441 reply.append_buffer(b); 442 reply.append_buffer(message->getSourceLink().serialize()); 443 444 try 445 { 446 send_node(&reply, message->getSourceNode(), 447 system_priority::OVERLAY, 448 OverlayInterface::OVERLAY_SERVICE_ID); 449 } 450 catch ( message_not_sent& e ) 451 { 452 logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too."); 453 } 315 454 } 316 455 317 456 /// sends a message to another node, delivers it to the base overlay class 318 seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) { 457 seqnum_t BaseOverlay::send( OverlayMsg* message, 458 const NodeID& destination, 459 uint8_t priority, 460 const NodeID& last_hop ) throw(message_not_sent) 461 { 319 462 LinkDescriptor* next_link = NULL; 320 463 321 464 // drop messages to unspecified destinations 322 if (destination.isUnspecified()) return -1; 323 324 // send messages to myself -> handle message and drop warning! 325 if (destination == nodeId) { 326 logging_warn("Sent message to myself. Handling message.") 327 Message msg; 328 msg.encapsulate(message); 329 handleMessage( &msg, NULL ); 330 return -1; 465 if (destination.isUnspecified()) 466 throw message_not_sent("No destination specified. Drop!"); 467 468 // send messages to myself -> drop! 469 // TODO maybe this is not what we want. why not just deliver this message? 470 // There is a similar check in the route function, there it should be okay. 471 if (destination == nodeId) 472 { 473 logging_warn("Sent message to myself. Drop!"); 474 475 throw message_not_sent("Sent message to myself. Drop!"); 331 476 } 332 477 333 478 // use relay path? 334 if (message->isRelayed()) { 479 if (message->isRelayed()) 480 { 335 481 next_link = getRelayLinkTo( destination ); 336 if (next_link != NULL) { 482 483 if (next_link != NULL) 484 { 337 485 next_link->setRelaying(); 338 return bc->sendMessage(next_link->communicationId, message); 339 } else { 340 logging_warn("Could not send message. No relay hop found to " 341 << destination << " -- trying to route over overlay paths ...") 342 // logging_error("ERROR: " << debugInformation() ); 343 // return -1; 344 } 345 } 346 486 487 // * send message over relayed link * 488 return send_overlaymessage_down(message, next_link->communicationId, priority); 489 } 490 else 491 { 492 logging_warn("No relay hop found to " << destination 493 << " -- trying to route over overlay paths ...") 494 } 495 } 496 497 347 498 // last resort -> route over overlay path 348 499 LinkID next_id = overlayInterface->getNextLinkId( destination ); 349 if (next_id.isUnspecified()) { 350 logging_warn("Could not send message. No next hop found to " << 351 destination ); 352 logging_error("ERROR: " << debugInformation() ); 353 return -1; 354 } 355 356 // get link descriptor, up and running? yes-> send message 500 if ( next_id.isUnspecified() ) 501 { 502 // apperently we're the closest node --> try second best node 503 // NOTE: This is helpful if our routing table is not up-to-date, but 504 // may lead to circles. So we have to be careful. 505 std::vector<const LinkID*> next_ids = 506 overlayInterface->getSortedLinkIdsTowardsNode( destination ); 507 508 for ( int i = 0; i < next_ids.size(); i++ ) 509 { 510 const LinkID& link = *next_ids[i]; 511 512 if ( ! link.isUnspecified() ) 513 { 514 next_id = link; 515 516 break; 517 } 518 } 519 520 // still no next hop found. drop. 521 if ( next_id.isUnspecified() ) 522 { 523 logging_warn("Could not send message. No next hop found to " << 524 destination ); 525 logging_error("ERROR: " << debugInformation() ); 526 527 throw message_not_sent("No next hop found."); 528 } 529 } 530 531 532 /* get link descriptor, do some checks and send message */ 357 533 next_link = getDescriptor(next_id); 358 if (next_link != NULL && next_link->up) { 359 // send message over relayed link 360 return send(message, next_link); 361 } 362 363 // no-> error, dropping message 364 else { 365 logging_warn("Could not send message. Link not known or up"); 366 logging_error("ERROR: " << debugInformation() ); 367 return -1; 368 } 369 370 // not reached-> fail 371 return -1; 372 } 534 535 // check pointer 536 if ( next_link == NULL ) 537 { 538 // NOTE: this shuldn't happen 539 throw message_not_sent("Could not send message. Link not known."); 540 } 541 542 // avoid circles 543 if ( next_link->remoteNode == last_hop ) 544 { 545 // XXX logging_debug 546 logging_info("Possible next hop would create a circle: " 547 << next_link->remoteNode); 548 549 throw message_not_sent("Could not send message. Possible next hop would create a circle."); 550 } 551 552 // check if link is up 553 if ( ! next_link->up) 554 { 555 logging_warn("Could not send message. Link not up"); 556 logging_error("ERROR: " << debugInformation() ); 557 558 throw message_not_sent("Could not send message. Link not up"); 559 } 560 561 // * send message over overlay link * 562 return send(message, next_link, priority); 563 } 564 373 565 374 566 /// send a message using a link descriptor, delivers it to the base overlay class 375 seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) { 567 seqnum_t BaseOverlay::send( OverlayMsg* message, 568 LinkDescriptor* ldr, 569 uint8_t priority ) throw(message_not_sent) 570 { 376 571 // check if null 377 if (ldr == NULL) { 378 logging_error("Can not send message to " << message->getDestinationAddress()); 379 return -1; 572 if (ldr == NULL) 573 { 574 ostringstream out; 575 out << "Can not send message to " << message->getDestinationAddress(); 576 throw message_not_sent(out.str()); 380 577 } 381 578 382 579 // check if up 383 if ( !ldr->up && !ignore_down) {384 logging_error("Can not send message. Link not up:" << ldr );580 if ( !ldr->up ) 581 { 385 582 logging_error("DEBUG_INFO: " << debugInformation() ); 386 return -1; 387 } 388 LinkDescriptor* ld = NULL; 389 390 // handle relayed link 391 if (ldr->relayed) { 583 584 ostringstream out; 585 out << "Can not send message. Link not up:" << ldr->to_string(); 586 throw message_not_sent(out.str()); 587 } 588 589 LinkDescriptor* next_hop_ld = NULL; 590 591 // BRANCH: relayed link 592 if (ldr->relayed) 593 { 392 594 logging_debug("Resolving direct link for relayed link to " 393 595 << ldr->remoteNode); 394 ld = getRelayLinkTo( ldr->remoteNode ); 395 if (ld==NULL) { 396 logging_error("No relay path found to link " << ldr ); 596 597 next_hop_ld = getRelayLinkTo( ldr->remoteNode ); 598 599 if (next_hop_ld==NULL) 600 { 397 601 logging_error("DEBUG_INFO: " << debugInformation() ); 398 return -1; 399 } 400 ld->setRelaying(); 602 603 ostringstream out; 604 out << "No relay path found to link: " << ldr; 605 throw message_not_sent(out.str()); 606 } 607 608 next_hop_ld->setRelaying(); 401 609 message->setRelayed(true); 402 } else 403 ld = ldr; 404 405 // handle direct link 406 if (ld->communicationUp) { 407 logging_debug("send(): Sending message over direct link."); 408 return bc->sendMessage( ld->communicationId, message ); 409 } else { 410 logging_error("send(): Could not send message. " 411 "Not a relayed link and direct link is not up."); 412 return -1; 413 } 414 return -1; 610 } 611 // BRANCH: direct link 612 else 613 { 614 next_hop_ld = ldr; 615 } 616 617 618 // check next hop-link 619 if ( ! next_hop_ld->communicationUp) 620 { 621 throw message_not_sent( "send(): Could not send message." 622 " Not a relayed link and direct link is not up." ); 623 } 624 625 // send over next link 626 logging_debug("send(): Sending message over direct link."); 627 return send_overlaymessage_down(message, next_hop_ld->communicationId, priority); 628 415 629 } 416 630 417 631 seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote, 418 const ServiceID& service) { 632 uint8_t priority, const ServiceID& service) throw(message_not_sent) 633 { 419 634 message->setSourceNode(nodeId); 420 635 message->setDestinationNode(remote); 421 636 message->setService(service); 422 return send( message, remote ); 423 } 424 425 seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) { 637 return send( message, remote, priority ); 638 } 639 640 void BaseOverlay::send_link( OverlayMsg* message, 641 const LinkID& link, 642 uint8_t priority ) throw(message_not_sent) 643 { 426 644 LinkDescriptor* ld = getDescriptor(link); 427 if (ld==NULL) { 428 logging_error("Cannot find descriptor to link id=" << link.toString()); 429 return -1; 430 } 645 if (ld==NULL) 646 { 647 throw message_not_sent("Cannot find descriptor to link id=" + link.toString()); 648 } 649 431 650 message->setSourceNode(nodeId); 432 651 message->setDestinationNode(ld->remoteNode); … … 437 656 message->setService(ld->service); 438 657 message->setRelayed(ld->relayed); 439 return send( message, ld, ignore_down ); 658 659 660 try 661 { 662 // * send message * 663 send( message, ld, priority ); 664 } 665 catch ( message_not_sent& e ) 666 { 667 // drop failed link 668 ld->failed = true; 669 dropLink(ld->overlayId); 670 } 440 671 } 441 672 … … 451 682 // relay link still used and alive? 452 683 if (ld==NULL 453 || !ld->isDirectVital() 454 || difftime(route.used, time(NULL)) > 8) { 684 || !isLinkDirectVital(ld) 685 || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT) // TODO this was set to 8 before.. Is the new timeout better? 686 { 455 687 logging_info("Forgetting relay information to node " 456 688 << route.node.toString() ); … … 488 720 if (message->isRelayed()) { 489 721 // try to find source node 490 BOOST_FOREACH( relay_route& route, relay_routes ) {722 foreach( relay_route& route, relay_routes ) { 491 723 // relay route found? yes-> 492 724 if ( route.node == message->getDestinationNode() ) { … … 504 736 505 737 // try to find source node 506 BOOST_FOREACH( relay_route& route, relay_routes ) {738 foreach( relay_route& route, relay_routes ) { 507 739 508 740 // relay route found? yes-> … … 516 748 if (route.hops > message->getNumHops() 517 749 || rld == NULL 518 || ! rld->isDirectVital()) {750 || !isLinkDirectVital(ld)) { 519 751 logging_info("Updating relay information to node " 520 752 << route.node.toString() 521 << " reducing to " << message->getNumHops() << " hops.");753 << " reducing to " << (int) message->getNumHops() << " hops."); 522 754 route.hops = message->getNumHops(); 523 755 route.link = ld->overlayId; … … 542 774 LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) { 543 775 // try to find source node 544 BOOST_FOREACH( relay_route& route, relay_routes ) {776 foreach( relay_route& route, relay_routes ) { 545 777 if (route.node == remote ) { 546 778 LinkDescriptor* ld = getDescriptor( route.link ); 547 if (ld==NULL || ! ld->isDirectVital()) return NULL; else {779 if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else { 548 780 route.used = time(NULL); 549 781 return ld; … … 575 807 // ---------------------------------------------------------------------------- 576 808 577 void BaseOverlay::start( BaseCommunication &_basecomm, const NodeID& _nodeid ) {809 void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) { 578 810 logging_info("Starting..."); 579 811 580 812 // set parameters 581 bc = &_basecomm;813 bc = _basecomm; 582 814 nodeId = _nodeid; 583 815 … … 587 819 588 820 // timer for auto link management 589 Timer::setInterval( 1000 ); 821 Timer::setInterval( 1000 ); // XXX 822 // Timer::setInterval( 10000 ); 590 823 Timer::start(); 591 824 … … 641 874 overlayInterface->joinOverlay(); 642 875 state = BaseOverlayStateCompleted; 643 BOOST_FOREACH( NodeListener* i, nodeListeners )876 foreach( NodeListener* i, nodeListeners ) 644 877 i->onJoinCompleted( spovnetId ); 645 878 … … 682 915 // gather all service links 683 916 vector<LinkID> servicelinks; 684 BOOST_FOREACH( LinkDescriptor* ld, links ) { 917 foreach( LinkDescriptor* ld, links ) 918 { 685 919 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID ) 686 920 servicelinks.push_back( ld->overlayId ); … … 688 922 689 923 // drop all service links 690 BOOST_FOREACH( LinkID lnk, servicelinks ) 691 dropLink( lnk ); 924 foreach( LinkID lnk, servicelinks ) 925 { 926 logging_debug("Dropping service link " << lnk.toString()); 927 dropLink( lnk ); 928 } 692 929 693 930 // let the node leave the spovnet overlay interface 694 931 logging_debug( "Leaving overlay" ); 695 932 if( overlayInterface != NULL ) 933 { 696 934 overlayInterface->leaveOverlay(); 935 } 697 936 698 937 // drop still open bootstrap links 699 BOOST_FOREACH( LinkID lnk, bootstrapLinks ) 700 bc->dropLink( lnk ); 938 foreach( LinkID lnk, bootstrapLinks ) 939 { 940 logging_debug("Dropping bootstrap link " << lnk.toString()); 941 bc->dropLink( lnk ); 942 } 701 943 702 944 // change to inalid state … … 708 950 709 951 // inform all registered services of the event 710 BOOST_FOREACH( NodeListener* i, nodeListeners ) { 952 foreach( NodeListener* i, nodeListeners ) 953 { 711 954 if( ret ) i->onLeaveCompleted( spovnetId ); 712 955 else i->onLeaveFailed( spovnetId ); … … 731 974 state = BaseOverlayStateInvalid; 732 975 733 BOOST_FOREACH( NodeListener* i, nodeListeners )976 foreach( NodeListener* i, nodeListeners ) 734 977 i->onJoinFailed( spovnetId ); 735 978 … … 783 1026 const ServiceID& service ) { 784 1027 1028 // TODO What if we already have a Link to this node and this service id? 1029 785 1030 // do not establish a link to myself! 786 if (remote == nodeId) return LinkID::UNSPECIFIED; 787 1031 if (remote == nodeId) return 1032 LinkID::UNSPECIFIED; 1033 1034 788 1035 // create a link descriptor 789 1036 LinkDescriptor* ld = addDescriptor(); … … 792 1039 ld->service = service; 793 1040 ld->listener = getListener(ld->service); 1041 1042 // initialize sequence numbers 1043 ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short(); 1044 logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum); 794 1045 795 1046 // create link request message … … 800 1051 msg.setRelayed(true); 801 1052 msg.setRegisterRelay(true); 1053 // msg.setRouteRecord(true); 1054 1055 msg.setSeqNum(ld->last_sent_seqnum); 802 1056 803 1057 // debug message … … 809 1063 ); 810 1064 1065 811 1066 // sending message to node 812 send_node( &msg, ld->remoteNode, ld->service ); 813 1067 try 1068 { 1069 // * send * 1070 seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service ); 1071 } 1072 catch ( message_not_sent& e ) 1073 { 1074 logging_warn("Link request not sent: " << e.what()); 1075 1076 // Message not sent. Cancel link request. 1077 SystemQueue::instance().scheduleCall( 1078 boost::bind( 1079 &BaseOverlay::__onLinkEstablishmentFailed, 1080 this, 1081 ld->overlayId) 1082 ); 1083 } 1084 814 1085 return ld->overlayId; 815 1086 } 816 1087 1088 /// NOTE: "id" is an Overlay-LinkID 1089 void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id) 1090 { 1091 // TODO This code redundant. But also it's not easy to aggregate in one function. 1092 1093 // get descriptor for link 1094 LinkDescriptor* ld = getDescriptor(id, false); 1095 if ( ld == NULL ) return; // not found? ->ignore! 1096 1097 logging_debug( "__onLinkEstablishmentFaild: " << ld ); 1098 1099 // removing relay link information 1100 removeRelayLink(ld->overlayId); 1101 1102 // inform listeners about link down 1103 ld->communicationUp = false; 1104 if (!ld->service.isUnspecified()) 1105 { 1106 CommunicationListener* lst = getListener(ld->service); 1107 if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode ); 1108 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1109 } 1110 1111 // delete all queued messages (auto links) 1112 if( ld->messageQueue.size() > 0 ) { 1113 logging_warn( "Dropping link " << id.toString() << " that has " 1114 << ld->messageQueue.size() << " waiting messages" ); 1115 ld->flushQueue(); 1116 } 1117 1118 // erase mapping 1119 eraseDescriptor(ld->overlayId); 1120 } 1121 1122 817 1123 /// drops an established link 818 void BaseOverlay::dropLink(const LinkID& link) { 819 logging_info( "Dropping link (initiated locally):" << link.toString() ); 1124 void BaseOverlay::dropLink(const LinkID& link) 1125 { 1126 logging_info( "Dropping link: " << link.toString() ); 820 1127 821 1128 // find the link item to drop 822 1129 LinkDescriptor* ld = getDescriptor(link); 823 if( ld == NULL ) { 1130 if( ld == NULL ) 1131 { 824 1132 logging_warn( "Can't drop link, link is unknown!"); 825 1133 return; … … 827 1135 828 1136 // delete all queued messages 829 if( ld->messageQueue.size() > 0 ) { 1137 if( ld->messageQueue.size() > 0 ) 1138 { 830 1139 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has " 831 1140 << ld->messageQueue.size() << " waiting messages" ); 832 1141 ld->flushQueue(); 833 1142 } 834 835 // inform sideport and listener 836 if(ld->listener != NULL) 837 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); 838 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId ); 839 840 // do not drop relay links 841 if (!ld->relaying) { 842 // drop the link in base communication 843 if (ld->communicationUp) bc->dropLink( ld->communicationId ); 844 845 // erase descriptor 846 eraseDescriptor( ld->overlayId ); 847 } else { 848 ld->dropAfterRelaying = true; 849 } 1143 1144 1145 // inform application and remote note (but only once) 1146 // NOTE: If we initiated the drop, this function is called twice, but on 1147 // the second call, there is noting to do. 1148 if ( ld->up && ! ld->failed ) 1149 { 1150 // inform sideport and listener 1151 if(ld->listener != NULL) 1152 { 1153 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); 1154 } 1155 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId ); 1156 1157 // send link-close to remote node 1158 logging_info("Sending LinkClose message to remote node."); 1159 OverlayMsg close_msg(OverlayMsg::typeLinkClose); 1160 send_link(&close_msg, link, system_priority::OVERLAY); 1161 1162 // deactivate link 1163 ld->up = false; 1164 // ld->closing = true; 1165 } 1166 1167 else if ( ld->failed ) 1168 { 1169 // inform listener 1170 if( ld->listener != NULL ) 1171 { 1172 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 1173 } 1174 1175 ld->up = false; 1176 __removeDroppedLink(ld->overlayId); 1177 } 1178 } 1179 1180 /// called from typeLinkClose-handler 1181 void BaseOverlay::__removeDroppedLink(const LinkID& link) 1182 { 1183 // find the link item to drop 1184 LinkDescriptor* ld = getDescriptor(link); 1185 if( ld == NULL ) 1186 { 1187 return; 1188 } 1189 1190 // do not drop relay links 1191 if (!ld->relaying) 1192 { 1193 // drop the link in base communication 1194 if (ld->communicationUp) 1195 { 1196 bc->dropLink( ld->communicationId ); 1197 } 1198 1199 // erase descriptor 1200 eraseDescriptor( ld->overlayId ); 1201 } 1202 else 1203 { 1204 ld->dropAfterRelaying = true; 1205 } 850 1206 } 851 1207 … … 853 1209 854 1210 /// internal send message, always use this functions to send messages over links 855 seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) { 1211 const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message, 1212 const LinkID& link, 1213 uint8_t priority ) throw(message_not_sent) 1214 { 856 1215 logging_debug( "Sending data message on link " << link.toString() ); 857 1216 858 1217 // get the mapping for this link 859 1218 LinkDescriptor* ld = getDescriptor(link); 860 if( ld == NULL ) { 861 logging_error("Could not send message. " 862 << "Link not found id=" << link.toString()); 863 return -1; 1219 if( ld == NULL ) 1220 { 1221 throw message_not_sent("Could not send message. Link not found id=" + link.toString()); 864 1222 } 865 1223 866 1224 // check if the link is up yet, if its an auto link queue message 867 if( !ld->up ) { 1225 if( !ld->up ) 1226 { 868 1227 ld->setAutoUsed(); 869 if( ld->autolink ) { 1228 if( ld->autolink ) 1229 { 870 1230 logging_info("Auto-link " << link.toString() << " not up, queue message"); 871 Data data = data_serialize( message ); 872 const_cast<Message*>(message)->dropPayload(); 873 ld->messageQueue.push_back( new Message(data) ); 874 } else { 875 logging_error("Link " << link.toString() << " not up, drop message"); 876 } 877 return -1; 878 } 879 880 // compile overlay message (has service and node id) 881 OverlayMsg overmsg( OverlayMsg::typeData ); 882 overmsg.encapsulate( const_cast<Message*>(message) ); 883 884 // send message over relay/direct/overlay 885 return send_link( &overmsg, ld->overlayId ); 886 } 887 888 889 seqnum_t BaseOverlay::sendMessage(const Message* message, 890 const NodeID& node, const ServiceID& service) { 1231 1232 // queue message 1233 LinkDescriptor::message_queue_entry msg; 1234 msg.message = message; 1235 msg.priority = priority; 1236 1237 ld->messageQueue.push_back( msg ); 1238 1239 return SequenceNumber::DISABLED; // TODO what to return if message is queued? 1240 } 1241 else 1242 { 1243 throw message_not_sent("Link " + link.toString() + " not up, drop message"); 1244 } 1245 } 1246 1247 // TODO AKTUELL: sequence numbers 1248 // TODO seqnum on fast path ? 1249 ld->last_sent_seqnum.increment(); 1250 1251 /* choose fast-path for direct links; normal overlay-path otherwise */ 1252 // BRANCH: direct link 1253 if ( ld->communicationUp && !ld->relayed ) 1254 { 1255 // * send down to BaseCommunication * 1256 try 1257 { 1258 bc->sendMessage(ld->communicationId, message, priority, true); 1259 } 1260 catch ( communication::communication_message_not_sent& e ) 1261 { 1262 ostringstream out; 1263 out << "Communication message on fast-path not sent: " << e.what(); 1264 throw message_not_sent(out.str()); 1265 } 1266 } 1267 1268 // BRANCH: use (slow) overlay-path 1269 else 1270 { 1271 // compile overlay message (has service and node id) 1272 OverlayMsg overmsg( OverlayMsg::typeData ); 1273 overmsg.set_payload_message(message); 1274 1275 // set SeqNum 1276 if ( ld->transmit_seqnums ) 1277 { 1278 overmsg.setSeqNum(ld->last_sent_seqnum); 1279 } 1280 logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum()); 1281 1282 // send message over relay/direct/overlay 1283 send_link( &overmsg, ld->overlayId, priority ); 1284 } 1285 1286 // return seqnum 1287 return ld->last_sent_seqnum; 1288 } 1289 1290 1291 const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message, 1292 const NodeID& node, uint8_t priority, const ServiceID& service) { 891 1293 892 1294 // find link for node and service … … 907 1309 if( ld == NULL ) { 908 1310 logging_error( "Failed to establish auto-link."); 909 return -1;1311 throw message_not_sent("Failed to establish auto-link."); 910 1312 } 911 1313 ld->autolink = true; … … 920 1322 921 1323 // send / queue message 922 return sendMessage( message, ld->overlayId );923 } 924 925 926 NodeID BaseOverlay::sendMessageCloserToNodeID( const Message*message,927 const NodeID& address, const ServiceID& service) {1324 return sendMessage( message, ld->overlayId, priority ); 1325 } 1326 1327 1328 NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message, 1329 const NodeID& address, uint8_t priority, const ServiceID& service) { 928 1330 929 1331 if ( overlayInterface->isClosestNodeTo(address) ) … … 936 1338 if ( closest_node != NodeID::UNSPECIFIED ) 937 1339 { 938 se qnum_t seqnum = sendMessage(message, closest_node, service);1340 sendMessage(message, closest_node, priority, service); 939 1341 } 940 1342 941 return closest_node; // XXXreturn seqnum ?? tuple? closest_node via (non const) reference?1343 return closest_node; // return seqnum ?? tuple? closest_node via (non const) reference? 942 1344 } 943 1345 // ---------------------------------------------------------------------------- … … 978 1380 979 1381 // see if we can find the node in our own table 980 BOOST_FOREACH(const LinkDescriptor* ld, links){1382 foreach(const LinkDescriptor* ld, links){ 981 1383 if(ld->remoteNode != node) continue; 982 1384 if(!ld->communicationUp) continue; … … 1079 1481 1080 1482 void BaseOverlay::onLinkUp(const LinkID& id, 1081 const address_v* local, const address_v* remote) { 1483 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 1484 { 1082 1485 logging_debug( "Link up with base communication link id=" << id ); 1083 1486 … … 1085 1488 LinkDescriptor* ld = getDescriptor(id, true); 1086 1489 1087 // handle bootstrap link we initiated1490 // BRANCH: handle bootstrap link we initiated 1088 1491 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){ 1089 1492 logging_info( 1090 1493 "Join has been initiated by me and the link is now up. " << 1494 "LinkID: " << id.toString() << 1091 1495 "Sending out join request for SpoVNet " << spovnetId.toString() 1092 1496 ); … … 1096 1500 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1097 1501 JoinRequest joinRequest( spovnetId, nodeId ); 1098 overlayMsg.encapsulate( &joinRequest ); 1099 bc->sendMessage( id, &overlayMsg ); 1502 overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer()); 1503 1504 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY); 1505 1100 1506 return; 1101 1507 } 1102 1508 1103 // no link found? ->link establishment from remote, add one!1509 // BRANCH: link establishment from remote, add one! 1104 1510 if (ld == NULL) { 1105 1511 ld = addDescriptor( id ); … … 1115 1521 // in this case, do not inform listener, since service it unknown 1116 1522 // -> wait for update message! 1117 1118 // link mapping found? -> send update message with node-id and service id 1119 } else { 1523 } 1524 1525 // BRANCH: We requested this link in the first place 1526 else 1527 { 1120 1528 logging_info( "onLinkUp descriptor (initiated locally):" << ld ); 1121 1529 … … 1126 1534 ld->fromRemote = false; 1127 1535 1128 // if link is a relayed link->convert to direct link 1129 if (ld->relayed) { 1130 logging_info( "Converting to direct link: " << ld ); 1536 // BRANCH: this was a relayed link before --> convert to direct link 1537 // TODO do we really have to send a message here? 1538 if (ld->relayed) 1539 { 1131 1540 ld->up = true; 1132 1541 ld->relayed = false; 1542 logging_info( "Converting to direct link: " << ld ); 1543 1544 // send message 1133 1545 OverlayMsg overMsg( OverlayMsg::typeLinkDirect ); 1134 1546 overMsg.setSourceLink( ld->overlayId ); 1135 1547 overMsg.setDestinationLink( ld->remoteLink ); 1136 send_link( &overMsg, ld->overlayId ); 1137 } else { 1548 send_link( &overMsg, ld->overlayId, system_priority::OVERLAY ); 1549 1550 // inform listener 1551 if( ld->listener != NULL) 1552 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 1553 } 1554 1555 1556 /* NOTE: Chord is opening direct-links in it's setup routine which are 1557 * neither set to "relayed" nor to "up". To activate these links a 1558 * typeLinkUpdate must be sent. 1559 * 1560 * This branch is would also be taken when we had a working link before 1561 * (ld->up == true). I'm not sure if this case does actually happen 1562 * and whether it's tested. 1563 */ 1564 else 1565 { 1138 1566 // note: necessary to validate the link on the remote side! 1139 1567 logging_info( "Sending out update" << … … 1144 1572 // compile and send update message 1145 1573 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate ); 1146 overlayMsg.setSourceLink(ld->overlayId);1147 1574 overlayMsg.setAutoLink( ld->autolink ); 1148 send_link( &overlayMsg, ld->overlayId, true ); 1575 overlayMsg.setSourceNode(nodeId); 1576 overlayMsg.setDestinationNode(ld->remoteNode); 1577 overlayMsg.setSourceLink(ld->overlayId); 1578 overlayMsg.setDestinationLink(ld->remoteLink); 1579 overlayMsg.setService(ld->service); 1580 overlayMsg.setRelayed(false); 1581 1582 // TODO ld->communicationId = id ?? 1583 1584 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY); 1149 1585 } 1150 1586 } … … 1152 1588 1153 1589 void BaseOverlay::onLinkDown(const LinkID& id, 1154 const address_v* local, const address_v* remote) { 1155 1590 const addressing2::EndpointPtr local, 1591 const addressing2::EndpointPtr remote) 1592 { 1156 1593 // erase bootstrap links 1157 1594 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); … … 1185 1622 } 1186 1623 1624 1625 void BaseOverlay::onLinkFail(const LinkID& id, 1626 const addressing2::EndpointPtr local, 1627 const addressing2::EndpointPtr remote) 1628 { 1629 logging_debug( "Link fail with base communication link id=" << id ); 1630 1631 // // erase bootstrap links 1632 // vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); 1633 // if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); 1634 // 1635 // // get descriptor for link 1636 // LinkDescriptor* ld = getDescriptor(id, true); 1637 // if ( ld == NULL ) return; // not found? ->ignore! 1638 // logging_debug( "Link failed id=" << ld->overlayId.toString() ); 1639 // 1640 // // inform listeners 1641 // ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 1642 // sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1643 1644 logging_debug( " ... calling onLinkDown ..." ); 1645 onLinkDown(id, local, remote); 1646 } 1647 1648 1187 1649 void BaseOverlay::onLinkChanged(const LinkID& id, 1188 const address_v* oldlocal, const address_v* newlocal, 1189 const address_v* oldremote, const address_v* newremote) { 1190 1191 // get descriptor for link 1192 LinkDescriptor* ld = getDescriptor(id, true); 1193 if ( ld == NULL ) return; // not found? ->ignore! 1194 logging_debug( "onLinkChanged descriptor: " << ld ); 1195 1196 // inform listeners 1197 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 1198 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1199 1200 // autolinks: refresh timestamp 1201 ld->setAutoUsed(); 1202 } 1203 1204 void BaseOverlay::onLinkFail(const LinkID& id, 1205 const address_v* local, const address_v* remote) { 1206 logging_debug( "Link fail with base communication link id=" << id ); 1207 1208 // erase bootstrap links 1209 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); 1210 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); 1211 1212 // get descriptor for link 1213 LinkDescriptor* ld = getDescriptor(id, true); 1214 if ( ld == NULL ) return; // not found? ->ignore! 1215 logging_debug( "Link failed id=" << ld->overlayId.toString() ); 1216 1217 // inform listeners 1218 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 1219 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1220 } 1221 1222 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local, 1223 const address_v* remote, const QoSParameterSet& qos) { 1224 logging_debug( "Link quality changed with base communication link id=" << id ); 1225 1226 // get descriptor for link 1227 LinkDescriptor* ld = getDescriptor(id, true); 1228 if ( ld == NULL ) return; // not found? ->ignore! 1229 logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); 1230 } 1231 1232 bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local, 1233 const address_v* remote ) { 1650 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 1651 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote) 1652 { 1653 // get descriptor for link 1654 LinkDescriptor* ld = getDescriptor(id, true); 1655 if ( ld == NULL ) return; // not found? ->ignore! 1656 logging_debug( "onLinkChanged descriptor: " << ld ); 1657 1658 // inform listeners 1659 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 1660 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1661 1662 // autolinks: refresh timestamp 1663 ld->setAutoUsed(); 1664 } 1665 1666 //void BaseOverlay::onLinkQoSChanged(const LinkID& id, 1667 // const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 1668 // const QoSParameterSet& qos) 1669 //{ 1670 // logging_debug( "Link quality changed with base communication link id=" << id ); 1671 // 1672 // // get descriptor for link 1673 // LinkDescriptor* ld = getDescriptor(id, true); 1674 // if ( ld == NULL ) return; // not found? ->ignore! 1675 // logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); 1676 //} 1677 1678 bool BaseOverlay::onLinkRequest(const LinkID& id, 1679 const addressing2::EndpointPtr local, 1680 const addressing2::EndpointPtr remote) 1681 { 1234 1682 logging_debug("Accepting link request from " << remote->to_string() ); 1683 1684 // TODO ask application..? 1685 1235 1686 return true; 1236 1687 } 1237 1688 1689 1690 1691 1238 1692 /// handles a message from base communication 1239 bool BaseOverlay::receiveMessage(const Message* message, 1240 const LinkID& link, const NodeID& ) { 1693 bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message, 1694 const LinkID& link, 1695 const NodeID&, 1696 bool bypass_overlay ) 1697 { 1241 1698 // get descriptor for link 1242 1699 LinkDescriptor* ld = getDescriptor( link, true ); 1243 return handleMessage( message, ld, link ); 1700 1701 1702 /* choose fastpath for direct links; normal overlay-path otherwise */ 1703 if ( bypass_overlay && ld ) 1704 { 1705 // message received --> link is alive 1706 ld->keepAliveReceived = time(NULL); 1707 // hop count on this link 1708 ld->hops = 0; 1709 1710 1711 // hand over to CommunicationListener (aka Application) 1712 CommunicationListener* lst = getListener(ld->service); 1713 if ( lst != NULL ) 1714 { 1715 lst->onMessage( 1716 message, 1717 ld->remoteNode, 1718 ld->overlayId, 1719 SequenceNumber::DISABLED, 1720 NULL ); 1721 1722 return true; 1723 } 1724 1725 return false; 1726 } 1727 else 1728 { 1729 return handleMessage( message, ld, link ); 1730 } 1244 1731 } 1245 1732 … … 1247 1734 1248 1735 /// Handle spovnet instance join requests 1249 bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {1250 1736 bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink ) 1737 { 1251 1738 // decapsulate message 1252 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>(); 1739 JoinRequest joinReq; 1740 joinReq.deserialize_from_shared_buffer(message); 1741 1253 1742 logging_info( "Received join request for spovnet " << 1254 joinReq ->getSpoVNetID().toString() );1743 joinReq.getSpoVNetID().toString() ); 1255 1744 1256 1745 // check spovnet id 1257 if( joinReq ->getSpoVNetID() != spovnetId ) {1746 if( joinReq.getSpoVNetID() != spovnetId ) { 1258 1747 logging_error( 1259 1748 "Received join request for spovnet we don't handle " << 1260 joinReq ->getSpoVNetID().toString() );1261 delete joinReq; 1749 joinReq.getSpoVNetID().toString() ); 1750 1262 1751 return false; 1263 1752 } … … 1267 1756 logging_info( "Sending join reply for spovnet " << 1268 1757 spovnetId.toString() << " to node " << 1269 overlayMsg->getSourceNode().toString() <<1758 source.toString() << 1270 1759 ". Result: " << (allow ? "allowed" : "denied") ); 1271 joiningNodes.push_back( overlayMsg->getSourceNode());1760 joiningNodes.push_back( source ); 1272 1761 1273 1762 // return overlay parameters … … 1276 1765 << getEndpointDescriptor().toString() ) 1277 1766 OverlayParameterSet parameters = overlayInterface->getParameters(); 1767 1768 1769 // create JoinReplay Message 1278 1770 OverlayMsg retmsg( OverlayMsg::typeJoinReply, 1279 1771 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1280 JoinReply replyMsg( spovnetId, parameters, 1281 allow, getEndpointDescriptor() ); 1282 retmsg.encapsulate(&replyMsg); 1283 bc->sendMessage( bcLink, &retmsg ); 1284 1285 delete joinReq; 1772 JoinReply replyMsg( spovnetId, parameters, allow ); 1773 retmsg.append_buffer(replyMsg.serialize_into_shared_buffer()); 1774 1775 // XXX This is unlovely clash between the old message system and the new one, 1776 // but a.t.m. we can't migrate everything to the new system at once.. 1777 // ---> Consider the EndpointDescriptor as part of the JoinReply.. 1778 retmsg.append_buffer(getEndpointDescriptor().serialize()); 1779 1780 // * send * 1781 send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY); 1782 1286 1783 return true; 1287 1784 } 1288 1785 1289 1786 /// Handle replies to spovnet instance join requests 1290 bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) { 1787 bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink ) 1788 { 1291 1789 // decapsulate message 1292 1790 logging_debug("received join reply message"); 1293 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>(); 1791 JoinReply replyMsg; 1792 EndpointDescriptor endpoints; 1793 reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message); 1794 buff = endpoints.deserialize(buff); 1294 1795 1295 1796 // correct spovnet? 1296 if( replyMsg ->getSpoVNetID() != spovnetId ) { // no-> fail1797 if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail 1297 1798 logging_error( "Received SpoVNet join reply for " << 1298 replyMsg ->getSpoVNetID().toString() <<1799 replyMsg.getSpoVNetID().toString() << 1299 1800 " != " << spovnetId.toString() ); 1300 delete replyMsg; 1801 1301 1802 return false; 1302 1803 } 1303 1804 1304 1805 // access granted? no -> fail 1305 if( !replyMsg ->getJoinAllowed() ) {1806 if( !replyMsg.getJoinAllowed() ) { 1306 1807 logging_error( "Our join request has been denied" ); 1307 1808 … … 1317 1818 1318 1819 // inform all registered services of the event 1319 BOOST_FOREACH( NodeListener* i, nodeListeners )1820 foreach( NodeListener* i, nodeListeners ) 1320 1821 i->onJoinFailed( spovnetId ); 1321 1822 1322 delete replyMsg;1323 1823 return true; 1324 1824 } … … 1329 1829 1330 1830 logging_debug( "Using bootstrap end-point " 1331 << replyMsg->getBootstrapEndpoint().toString() );1831 << endpoints.toString() ); 1332 1832 1333 1833 // create overlay structure from spovnet parameter set … … 1338 1838 1339 1839 overlayInterface = OverlayFactory::create( 1340 *this, replyMsg ->getParam(), nodeId, this );1840 *this, replyMsg.getParam(), nodeId, this ); 1341 1841 1342 1842 // overlay structure supported? no-> fail! … … 1354 1854 1355 1855 // inform all registered services of the event 1356 BOOST_FOREACH( NodeListener* i, nodeListeners )1856 foreach( NodeListener* i, nodeListeners ) 1357 1857 i->onJoinFailed( spovnetId ); 1358 1858 1359 delete replyMsg;1360 1859 return true; 1361 1860 } … … 1365 1864 overlayInterface->createOverlay(); 1366 1865 1367 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint());1368 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint());1866 overlayInterface->joinOverlay( endpoints ); 1867 overlayBootstrap.recordJoin( endpoints ); 1369 1868 1370 1869 // update ovlvis … … 1372 1871 1373 1872 // inform all registered services of the event 1374 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1375 i->onJoinCompleted( spovnetId ); 1376 1377 delete replyMsg; 1378 1379 } else { 1380 1873 foreach( NodeListener* i, nodeListeners ) 1874 i->onJoinCompleted( spovnetId ); 1875 } 1876 else 1877 { 1381 1878 // this is not the first bootstrap, just join the additional node 1382 1879 logging_debug("not first-time bootstrapping"); 1383 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); 1384 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() ); 1385 1386 delete replyMsg; 1387 1880 overlayInterface->joinOverlay( endpoints ); 1881 overlayBootstrap.recordJoin( endpoints ); 1388 1882 } // if( overlayInterface == NULL ) 1389 1883 … … 1392 1886 1393 1887 1394 bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1888 bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld ) 1889 { 1395 1890 // get service 1396 const ServiceID& service = overlayMsg->getService(); 1891 const ServiceID& service = ld->service; //overlayMsg->getService(); 1892 1397 1893 logging_debug( "Received data for service " << service.toString() 1398 1894 << " on link " << overlayMsg->getDestinationLink().toString() ); … … 1402 1898 if(lst != NULL){ 1403 1899 lst->onMessage( 1404 overlayMsg, 1405 overlayMsg->getSourceNode(), 1406 overlayMsg->getDestinationLink() 1900 message, 1901 // overlayMsg->getSourceNode(), 1902 // overlayMsg->getDestinationLink(), 1903 ld->remoteNode, 1904 ld->overlayId, 1905 overlayMsg->getSeqNum(), 1906 overlayMsg 1407 1907 ); 1408 1908 } … … 1411 1911 } 1412 1912 1913 bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg ) 1914 { 1915 /** 1916 * Deserialize MessageLost-Message 1917 * 1918 * - Type of lost message 1919 * - Hop count of lost message 1920 * - Source-LinkID of lost message 1921 */ 1922 const uint8_t* buff = message(0, sizeof(uint8_t)*2).data(); 1923 uint8_t type = buff[0]; 1924 uint8_t hops = buff[1]; 1925 LinkID linkid; 1926 linkid.deserialize(message(sizeof(uint8_t)*2)); 1927 1928 logging_warn("Node " << msg->getSourceNode() 1929 << " informed us, that our message of type " << (int) type 1930 << " is lost after traveling " << (int) hops << " hops." 1931 << " (LinkID: " << linkid.toString()); 1932 1933 1934 // TODO switch-case ? 1935 1936 // BRANCH: LinkRequest --> link request failed 1937 if ( type == OverlayMsg::typeLinkRequest ) 1938 { 1939 __onLinkEstablishmentFailed(linkid); 1940 } 1941 1942 // BRANCH: Data --> link disrupted. Drop link. 1943 // (We could use something more advanced here. e.g. At least send a 1944 // keep-alive message and wait for a keep-alive reply.) 1945 if ( type == OverlayMsg::typeData ) 1946 { 1947 LinkDescriptor* link_desc = getDescriptor(linkid); 1948 1949 if ( link_desc ) 1950 { 1951 link_desc->failed = true; 1952 } 1953 1954 dropLink(linkid); 1955 } 1956 1957 // BRANCH: ping lost 1958 if ( type == OverlayMsg::typePing ) 1959 { 1960 CommunicationListener* lst = getListener(msg->getService()); 1961 if( lst != NULL ) 1962 { 1963 lst->onPingLost(msg->getSourceNode()); 1964 } 1965 } 1966 1967 return true; 1968 } 1969 1970 bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld ) 1971 { 1972 // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node) 1973 1974 bool send_pong = false; 1975 1976 // inform application and ask permission to send a pong message 1977 CommunicationListener* lst = getListener(overlayMsg->getService()); 1978 if( lst != NULL ) 1979 { 1980 send_pong = lst->onPing(overlayMsg->getSourceNode()); 1981 } 1982 1983 // send pong message if allowed 1984 if ( send_pong ) 1985 { 1986 OverlayMsg pong_msg(OverlayMsg::typePong); 1987 pong_msg.setSeqNum(overlayMsg->getSeqNum()); 1988 1989 // send message 1990 try 1991 { 1992 send_node( &pong_msg, 1993 overlayMsg->getSourceNode(), 1994 system_priority::OVERLAY, 1995 overlayMsg->getService() ); 1996 } 1997 catch ( message_not_sent& e ) 1998 { 1999 logging_info("Could not send Pong-Message to node: " << 2000 overlayMsg->getSourceNode()); 2001 } 2002 } 2003 } 2004 2005 bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld ) 2006 { 2007 // inform application 2008 CommunicationListener* lst = getListener(overlayMsg->getService()); 2009 if( lst != NULL ) 2010 { 2011 lst->onPong(overlayMsg->getSourceNode()); 2012 } 2013 } 1413 2014 1414 2015 bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { … … 1439 2040 overlayMsg->setSourceLink(ld->overlayId); 1440 2041 overlayMsg->setService(ld->service); 1441 send( overlayMsg, ld );2042 send( overlayMsg, ld, system_priority::OVERLAY ); 1442 2043 } 1443 2044 … … 1481 2082 if( ld->messageQueue.size() > 0 ) { 1482 2083 logging_info( "Sending out queued messages on link " << ld ); 1483 BOOST_FOREACH( Message* msg, ld->messageQueue ) { 1484 sendMessage( msg, ld->overlayId ); 1485 delete msg;1486 2084 foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue ) 2085 { 2086 sendMessage( msg.message, ld->overlayId, msg.priority ); 2087 } 1487 2088 ld->messageQueue.clear(); 1488 2089 } … … 1497 2098 /// handle a link request and reply 1498 2099 bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1499 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );1500 2100 1501 2101 //TODO: Check if a request has already been sent using getSourceLink() ... … … 1514 2114 ldn->remoteNode = overlayMsg->getSourceNode(); 1515 2115 ldn->remoteLink = overlayMsg->getSourceLink(); 1516 2116 ldn->hops = overlayMsg->getNumHops(); 2117 2118 // initialize sequence numbers 2119 ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short(); 2120 logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum); 2121 2122 1517 2123 // update time-stamps 1518 2124 ldn->setAlive(); 1519 2125 ldn->setAutoUsed(); 1520 2126 2127 logging_info( "Link request received from node id=" 2128 << overlayMsg->getSourceNode() 2129 << " LINK: " 2130 << ldn); 2131 1521 2132 // create reply message and send back! 1522 2133 overlayMsg->swapRoles(); // swap source/destination 1523 2134 overlayMsg->setType(OverlayMsg::typeLinkReply); 1524 2135 overlayMsg->setSourceLink(ldn->overlayId); 1525 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );1526 2136 overlayMsg->setRelayed(true); 1527 send( overlayMsg, ld ); // send back to link 2137 // overlayMsg->setRouteRecord(true); 2138 overlayMsg->setSeqNum(ld->last_sent_seqnum); 2139 2140 // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!! 2141 // append our endpoints (for creation of a direct link) 2142 overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize()); 2143 2144 send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link 1528 2145 1529 2146 // inform listener … … 1534 2151 } 1535 2152 1536 bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1537 2153 bool BaseOverlay::handleLinkReply( 2154 OverlayMsg* overlayMsg, 2155 reboost::shared_buffer_t sub_message, 2156 LinkDescriptor* ld ) 2157 { 2158 // deserialize EndpointDescriptor 2159 EndpointDescriptor endpoints; 2160 endpoints.deserialize(sub_message); 2161 1538 2162 // find link request 1539 2163 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink()); … … 1554 2178 1555 2179 // debug message 1556 logging_ debug( "Link request reply received. Establishing link"2180 logging_info( "Link request reply received. Establishing link" 1557 2181 << " for service " << overlayMsg->getService().toString() 1558 2182 << " with local id=" << overlayMsg->getDestinationLink() 1559 2183 << " and remote link id=" << overlayMsg->getSourceLink() 1560 << " to " << overlayMsg->getSourceEndpoint().toString() 2184 << " to " << endpoints.toString() 2185 << " hop count: " << overlayMsg->getRouteRecord().size() 1561 2186 ); 1562 2187 … … 1577 2202 logging_info( "Sending out queued messages on link " << 1578 2203 ldn->overlayId.toString() ); 1579 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {1580 sendMessage( msg, ldn->overlayId );1581 delete msg;2204 foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue ) 2205 { 2206 sendMessage( msg.message, ldn->overlayId, msg.priority ); 1582 2207 } 1583 2208 ldn->messageQueue.clear(); … … 1589 2214 // try to replace relay link with direct link 1590 2215 ldn->retryCounter = 3; 1591 ldn->endpoint = overlayMsg->getSourceEndpoint();2216 ldn->endpoint = endpoints; 1592 2217 ldn->communicationId = bc->establishLink( ldn->endpoint ); 1593 2218 … … 1596 2221 1597 2222 /// handle a keep-alive message for a link 1598 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 2223 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) 2224 { 1599 2225 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink()); 1600 if ( rld != NULL ) { 1601 logging_debug("Keep-Alive for " << 1602 overlayMsg->getDestinationLink() ); 2226 2227 if ( rld != NULL ) 2228 { 2229 logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() ); 1603 2230 if (overlayMsg->isRouteRecord()) 2231 { 1604 2232 rld->routeRecord = overlayMsg->getRouteRecord(); 2233 } 2234 2235 // set alive 1605 2236 rld->setAlive(); 2237 2238 2239 /* answer keep alive */ 2240 if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive ) 2241 { 2242 time_t now = time(NULL); 2243 logging_debug("[BaseOverlay] Answering KeepAlive over " 2244 << ld->to_string() 2245 << " after " 2246 << difftime( now, ld->keepAliveSent ) 2247 << "s"); 2248 2249 OverlayMsg msg( OverlayMsg::typeKeepAliveReply, 2250 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 2251 msg.setRouteRecord(true); 2252 ld->keepAliveSent = now; 2253 send_link( &msg, ld->overlayId, system_priority::OVERLAY ); 2254 } 2255 1606 2256 return true; 1607 } else { 1608 logging_error("Keep-Alive for " 2257 } 2258 else 2259 { 2260 logging_error("No Keep-Alive for " 1609 2261 << overlayMsg->getDestinationLink() << ": link unknown." ); 1610 2262 return false; … … 1636 2288 // erase the original descriptor 1637 2289 eraseDescriptor(ld->overlayId); 2290 2291 // inform listener 2292 if( rld->listener != NULL) 2293 rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode ); 2294 1638 2295 return true; 1639 2296 } 1640 2297 1641 2298 /// handles an incoming message 1642 bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld, 1643 const LinkID bcLink ) { 1644 logging_debug( "Handling message: " << message->toString()); 1645 2299 bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld, 2300 const LinkID bcLink ) 2301 { 1646 2302 // decapsulate overlay message 1647 OverlayMsg* overlayMsg = 1648 const_cast<Message*>(message)->decapsulate<OverlayMsg>(); 1649 if( overlayMsg == NULL ) return false; 1650 2303 OverlayMsg* overlayMsg = new OverlayMsg(); 2304 reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message); 2305 2306 // // XXX debug 2307 // logging_info( "Received overlay message." 2308 // << " Hops: " << (int) overlayMsg->getNumHops() 2309 // << " Type: " << (int) overlayMsg->getType() 2310 // << " Payload size: " << sub_buff.size() 2311 // << " SeqNum: " << overlayMsg->getSeqNum() ); 2312 2313 1651 2314 // increase number of hops 1652 2315 overlayMsg->increaseNumHops(); … … 1660 2323 // handle signaling messages (do not route!) 1661 2324 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart && 1662 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) { 1663 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); 2325 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) 2326 { 2327 overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); 1664 2328 delete overlayMsg; 1665 2329 return true; … … 1673 2337 << " to " << overlayMsg->getDestinationNode() 1674 2338 ); 1675 route( overlayMsg ); 2339 2340 // // XXX testing AKTUELL 2341 // logging_info("MARIO: Routing message " 2342 // << " from " << overlayMsg->getSourceNode() 2343 // << " to " << overlayMsg->getDestinationNode() ); 2344 // logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size()); 2345 overlayMsg->append_buffer(sub_buff); 2346 2347 route( overlayMsg, ld->remoteNode ); 1676 2348 delete overlayMsg; 1677 2349 return true; 1678 2350 } 1679 2351 1680 // handle base overlay message 2352 2353 /* handle base overlay message */ 1681 2354 bool ret = false; // return value 1682 switch ( overlayMsg->getType() ) { 1683 1684 // data transport messages 1685 case OverlayMsg::typeData: 1686 ret = handleData(overlayMsg, ld); break; 1687 1688 // overlay setup messages 1689 case OverlayMsg::typeJoinRequest: 1690 ret = handleJoinRequest(overlayMsg, bcLink ); break; 1691 case OverlayMsg::typeJoinReply: 1692 ret = handleJoinReply(overlayMsg, bcLink ); break; 1693 1694 // link specific messages 1695 case OverlayMsg::typeLinkRequest: 1696 ret = handleLinkRequest(overlayMsg, ld ); break; 1697 case OverlayMsg::typeLinkReply: 1698 ret = handleLinkReply(overlayMsg, ld ); break; 1699 case OverlayMsg::typeLinkUpdate: 1700 ret = handleLinkUpdate(overlayMsg, ld ); break; 1701 case OverlayMsg::typeLinkAlive: 1702 ret = handleLinkAlive(overlayMsg, ld ); break; 1703 case OverlayMsg::typeLinkDirect: 1704 ret = handleLinkDirect(overlayMsg, ld ); break; 1705 1706 // handle unknown message type 1707 default: { 1708 logging_error( "received message in invalid state! don't know " << 1709 "what to do with this message of type " << overlayMsg->getType() ); 1710 ret = false; 1711 break; 1712 } 2355 try 2356 { 2357 switch ( overlayMsg->getType() ) 2358 { 2359 // data transport messages 2360 case OverlayMsg::typeData: 2361 { 2362 // NOTE: On relayed links, »ld« does not point to our link, but on the relay link. 2363 LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink()); 2364 2365 if ( ! end_to_end_ld ) 2366 { 2367 logging_warn("Error: Data-Message claims to belong to a link we don't know."); 2368 2369 ret = false; 2370 } 2371 else 2372 { 2373 // message received --> link is alive 2374 end_to_end_ld->keepAliveReceived = time(NULL); 2375 // hop count on this link 2376 end_to_end_ld->hops = overlayMsg->getNumHops(); 2377 2378 // * call handler * 2379 ret = handleData(sub_buff, overlayMsg, end_to_end_ld); 2380 } 2381 2382 break; 2383 } 2384 case OverlayMsg::typeMessageLost: 2385 ret = handleLostMessage(sub_buff, overlayMsg); 2386 2387 break; 2388 2389 // overlay setup messages 2390 case OverlayMsg::typeJoinRequest: 2391 ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink ); break; 2392 case OverlayMsg::typeJoinReply: 2393 ret = handleJoinReply(sub_buff, bcLink ); break; 2394 2395 // link specific messages 2396 case OverlayMsg::typeLinkRequest: 2397 ret = handleLinkRequest(overlayMsg, ld ); break; 2398 case OverlayMsg::typeLinkReply: 2399 ret = handleLinkReply(overlayMsg, sub_buff, ld ); break; 2400 case OverlayMsg::typeLinkUpdate: 2401 ret = handleLinkUpdate(overlayMsg, ld ); break; 2402 case OverlayMsg::typeKeepAlive: 2403 case OverlayMsg::typeKeepAliveReply: 2404 ret = handleLinkAlive(overlayMsg, ld ); break; 2405 case OverlayMsg::typeLinkDirect: 2406 ret = handleLinkDirect(overlayMsg, ld ); break; 2407 2408 case OverlayMsg::typeLinkClose: 2409 { 2410 dropLink(overlayMsg->getDestinationLink()); 2411 __removeDroppedLink(overlayMsg->getDestinationLink()); 2412 2413 break; 2414 } 2415 2416 /// ping over overlay path (or similar) 2417 case OverlayMsg::typePing: 2418 { 2419 ret = handlePing(overlayMsg, ld); 2420 break; 2421 } 2422 case OverlayMsg::typePong: 2423 { 2424 ret = handlePong(overlayMsg, ld); 2425 break; 2426 } 2427 2428 // handle unknown message type 2429 default: 2430 { 2431 logging_error( "received message in invalid state! don't know " << 2432 "what to do with this message of type " << overlayMsg->getType() ); 2433 ret = false; 2434 break; 2435 } 2436 } 2437 } 2438 catch ( reboost::illegal_sub_buffer& e ) 2439 { 2440 logging_error( "Failed to create sub-buffer while reading message: »" 2441 << e.what() 2442 << "« Message too short? "); 2443 2444 assert(false); // XXX 1713 2445 } 1714 2446 … … 1720 2452 // ---------------------------------------------------------------------------- 1721 2453 1722 void BaseOverlay::broadcastMessage( Message* message, const ServiceID& service) {2454 void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) { 1723 2455 1724 2456 logging_debug( "broadcasting message to all known nodes " << 1725 2457 "in the overlay from service " + service.toString() ); 1726 1727 if(message == NULL) return;1728 message->setReleasePayload(false);1729 2458 1730 2459 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true); … … 1732 2461 NodeID& id = nodes.at(i); 1733 2462 if(id == this->nodeId) continue; // don't send to ourselfs 1734 if(i+1 == nodes.size()) message->setReleasePayload(true); // release payload on last send 1735 sendMessage( message, id, service );2463 2464 sendMessage( message, id, priority, service ); 1736 2465 } 1737 2466 } … … 1755 2484 vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const { 1756 2485 vector<LinkID> linkvector; 1757 BOOST_FOREACH( LinkDescriptor* ld, links ) {2486 foreach( LinkDescriptor* ld, links ) { 1758 2487 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) { 1759 2488 linkvector.push_back( ld->overlayId ); … … 1779 2508 updateVisual(); 1780 2509 } 2510 2511 2512 2513 /* link status */ 2514 bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const 2515 { 2516 const LinkDescriptor* ld = getDescriptor(lnk); 2517 2518 if (!ld) 2519 return false; 2520 2521 return ld->communicationUp && !ld->relayed; 2522 } 2523 2524 int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const 2525 { 2526 const LinkDescriptor* ld = getDescriptor(lnk); 2527 2528 if (!ld) 2529 return -1; 2530 2531 return ld->hops; 2532 } 2533 2534 2535 bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const 2536 { 2537 time_t now = time(NULL); 2538 2539 return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..? 2540 } 2541 2542 bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const 2543 { 2544 return isLinkVital(link) && link->communicationUp && !link->relayed; 2545 } 2546 2547 /* [link status] */ 2548 1781 2549 1782 2550 void BaseOverlay::updateVisual(){ … … 1878 2646 static set<NodeID> linkset; 1879 2647 set<NodeID> remotenodes; 1880 BOOST_FOREACH( LinkDescriptor* ld, links ) {1881 if (! ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)2648 foreach( LinkDescriptor* ld, links ) { 2649 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) 1882 2650 continue; 1883 2651 … … 1895 2663 do{ 1896 2664 changed = false; 1897 BOOST_FOREACH(NodeID n, linkset){2665 foreach(NodeID n, linkset){ 1898 2666 if(remotenodes.find(n) == remotenodes.end()){ 1899 2667 visualInstance.visDisconnect(visualIdBase, this->nodeId, n, ""); … … 1908 2676 do{ 1909 2677 changed = false; 1910 BOOST_FOREACH(NodeID n, remotenodes){2678 foreach(NodeID n, remotenodes){ 1911 2679 if(linkset.find(n) == linkset.end()){ 1912 2680 visualInstance.visConnect(visualIdBase, this->nodeId, n, ""); … … 1933 2701 // dump link state 1934 2702 s << "--- link state -------------------------------" << endl; 1935 BOOST_FOREACH( LinkDescriptor* ld, links ) {2703 foreach( LinkDescriptor* ld, links ) { 1936 2704 s << "link " << i << ": " << ld << endl; 1937 2705 i++; -
source/ariba/overlay/BaseOverlay.h